Skip to main content

quiche/h3/
mod.rs

1// Copyright (C) 2019, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27//! HTTP/3 wire protocol and QPACK implementation.
28//!
29//! This module provides a high level API for sending and receiving HTTP/3
30//! requests and responses on top of the QUIC transport protocol.
31//!
32//! ## Connection setup
33//!
34//! HTTP/3 connections require a QUIC transport-layer connection, see
35//! [Connection setup] for a full description of the setup process.
36//!
37//! To use HTTP/3, the QUIC connection must be configured with a suitable
38//! Application Layer Protocol Negotiation (ALPN) Protocol ID:
39//!
40//! ```
41//! let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION)?;
42//! config.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)?;
43//! # Ok::<(), quiche::Error>(())
44//! ```
45//!
46//! The QUIC handshake is driven by [sending] and [receiving] QUIC packets.
47//!
48//! Once the handshake has completed, the first step in establishing an HTTP/3
49//! connection is creating its configuration object:
50//!
51//! ```
52//! let h3_config = quiche::h3::Config::new()?;
53//! # Ok::<(), quiche::h3::Error>(())
54//! ```
55//!
56//! HTTP/3 client and server connections are both created using the
57//! [`with_transport()`] function, the role is inferred from the type of QUIC
58//! connection:
59//!
60//! ```no_run
61//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
62//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
63//! # let peer = "127.0.0.1:1234".parse().unwrap();
64//! # let local = "127.0.0.1:4321".parse().unwrap();
65//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
66//! # let h3_config = quiche::h3::Config::new()?;
67//! let h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
68//! # Ok::<(), quiche::h3::Error>(())
69//! ```
70//!
71//! ## Sending a request
72//!
73//! An HTTP/3 client can send a request by using the connection's
74//! [`send_request()`] method to queue request headers; [sending] QUIC packets
75//! causes the requests to get sent to the peer:
76//!
77//! ```no_run
78//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
79//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
80//! # let peer = "127.0.0.1:1234".parse().unwrap();
81//! # let local = "127.0.0.1:4321".parse().unwrap();
82//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
83//! # let h3_config = quiche::h3::Config::new()?;
84//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
85//! let req = vec![
86//!     quiche::h3::Header::new(b":method", b"GET"),
87//!     quiche::h3::Header::new(b":scheme", b"https"),
88//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
89//!     quiche::h3::Header::new(b":path", b"/"),
90//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
91//! ];
92//!
93//! h3_conn.send_request(&mut conn, &req, true)?;
94//! # Ok::<(), quiche::h3::Error>(())
95//! ```
96//!
97//! An HTTP/3 client can send a request with additional body data by using
98//! the connection's [`send_body()`] method:
99//!
100//! ```no_run
101//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
102//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
103//! # let peer = "127.0.0.1:1234".parse().unwrap();
104//! # let local = "127.0.0.1:4321".parse().unwrap();
105//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
106//! # let h3_config = quiche::h3::Config::new()?;
107//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
108//! let req = vec![
109//!     quiche::h3::Header::new(b":method", b"GET"),
110//!     quiche::h3::Header::new(b":scheme", b"https"),
111//!     quiche::h3::Header::new(b":authority", b"quic.tech"),
112//!     quiche::h3::Header::new(b":path", b"/"),
113//!     quiche::h3::Header::new(b"user-agent", b"quiche"),
114//! ];
115//!
116//! let stream_id = h3_conn.send_request(&mut conn, &req, false)?;
117//! h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
118//! # Ok::<(), quiche::h3::Error>(())
119//! ```
120//!
121//! ## Handling requests and responses
122//!
123//! After [receiving] QUIC packets, HTTP/3 data is processed using the
124//! connection's [`poll()`] method. On success, this returns an [`Event`] object
125//! and an ID corresponding to the stream where the `Event` originated.
126//!
127//! An HTTP/3 server uses [`poll()`] to read requests and responds to them using
128//! [`send_response()`] and [`send_body()`]:
129//!
130//! ```no_run
131//! use quiche::h3::NameValue;
132//!
133//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
134//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
135//! # let peer = "127.0.0.1:1234".parse().unwrap();
136//! # let local = "127.0.0.1:1234".parse().unwrap();
137//! # let mut conn = quiche::accept(&scid, None, local, peer, &mut config).unwrap();
138//! # let h3_config = quiche::h3::Config::new()?;
139//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
140//! loop {
141//!     match h3_conn.poll(&mut conn) {
142//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
143//!             let mut headers = list.into_iter();
144//!
145//!             // Look for the request's method.
146//!             let method = headers.find(|h| h.name() == b":method").unwrap();
147//!
148//!             // Look for the request's path.
149//!             let path = headers.find(|h| h.name() == b":path").unwrap();
150//!
151//!             if method.value() == b"GET" && path.value() == b"/" {
152//!                 let resp = vec![
153//!                     quiche::h3::Header::new(b":status", 200.to_string().as_bytes()),
154//!                     quiche::h3::Header::new(b"server", b"quiche"),
155//!                 ];
156//!
157//!                 h3_conn.send_response(&mut conn, stream_id, &resp, false)?;
158//!                 h3_conn.send_body(&mut conn, stream_id, b"Hello World!", true)?;
159//!             }
160//!         },
161//!
162//!         Ok((stream_id, quiche::h3::Event::Data)) => {
163//!             // Request body data, handle it.
164//!             # return Ok(());
165//!         },
166//!
167//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
168//!             // Peer terminated stream, handle it.
169//!         },
170//!
171//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
172//!             // Peer reset the stream, handle it.
173//!         },
174//!
175//!         Ok((_flow_id, quiche::h3::Event::PriorityUpdate)) => (),
176//!
177//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
178//!              // Peer signalled it is going away, handle it.
179//!         },
180//!
181//!         Err(quiche::h3::Error::Done) => {
182//!             // Done reading.
183//!             break;
184//!         },
185//!
186//!         Err(e) => {
187//!             // An error occurred, handle it.
188//!             break;
189//!         },
190//!     }
191//! }
192//! # Ok::<(), quiche::h3::Error>(())
193//! ```
194//!
195//! An HTTP/3 client uses [`poll()`] to read responses:
196//!
197//! ```no_run
198//! use quiche::h3::NameValue;
199//!
200//! # let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
201//! # let scid = quiche::ConnectionId::from_ref(&[0xba; 16]);
202//! # let peer = "127.0.0.1:1234".parse().unwrap();
203//! # let local = "127.0.0.1:1234".parse().unwrap();
204//! # let mut conn = quiche::connect(None, &scid, local, peer, &mut config).unwrap();
205//! # let h3_config = quiche::h3::Config::new()?;
206//! # let mut h3_conn = quiche::h3::Connection::with_transport(&mut conn, &h3_config)?;
207//! loop {
208//!     match h3_conn.poll(&mut conn) {
209//!         Ok((stream_id, quiche::h3::Event::Headers{list, more_frames})) => {
210//!             let status = list.iter().find(|h| h.name() == b":status").unwrap();
211//!             println!("Received {} response on stream {}",
212//!                      std::str::from_utf8(status.value()).unwrap(),
213//!                      stream_id);
214//!         },
215//!
216//!         Ok((stream_id, quiche::h3::Event::Data)) => {
217//!             let mut body = vec![0; 4096];
218//!
219//!             // Consume all body data received on the stream.
220//!             while let Ok(read) =
221//!                 h3_conn.recv_body(&mut conn, stream_id, &mut body)
222//!             {
223//!                 println!("Received {} bytes of payload on stream {}",
224//!                          read, stream_id);
225//!             }
226//!         },
227//!
228//!         Ok((stream_id, quiche::h3::Event::Finished)) => {
229//!             // Peer terminated stream, handle it.
230//!         },
231//!
232//!         Ok((stream_id, quiche::h3::Event::Reset(err))) => {
233//!             // Peer reset the stream, handle it.
234//!         },
235//!
236//!         Ok((_prioritized_element_id, quiche::h3::Event::PriorityUpdate)) => (),
237//!
238//!         Ok((goaway_id, quiche::h3::Event::GoAway)) => {
239//!              // Peer signalled it is going away, handle it.
240//!         },
241//!
242//!         Err(quiche::h3::Error::Done) => {
243//!             // Done reading.
244//!             break;
245//!         },
246//!
247//!         Err(e) => {
248//!             // An error occurred, handle it.
249//!             break;
250//!         },
251//!     }
252//! }
253//! # Ok::<(), quiche::h3::Error>(())
254//! ```
255//!
256//! ## Detecting end of request or response
257//!
258//! A single HTTP/3 request or response may consist of several HEADERS and DATA
259//! frames; it is finished when the QUIC stream is closed. Calling [`poll()`]
260//! repeatedly will generate an [`Event`] for each of these. The application may
261//! use these event to do additional HTTP semantic validation.
262//!
263//! ## HTTP/3 protocol errors
264//!
265//! Quiche is responsible for managing the HTTP/3 connection, ensuring it is in
266//! a correct state and validating all messages received by a peer. This mainly
267//! takes place in the [`poll()`] method. If an HTTP/3 error occurs, quiche will
268//! close the connection and send an appropriate CONNECTION_CLOSE frame to the
269//! peer. An [`Error`] is returned to the application so that it can perform any
270//! required tidy up such as closing sockets.
271//!
272//! [`application_proto()`]: ../struct.Connection.html#method.application_proto
273//! [`stream_finished()`]: ../struct.Connection.html#method.stream_finished
274//! [Connection setup]: ../index.html#connection-setup
275//! [sending]: ../index.html#generating-outgoing-packets
276//! [receiving]: ../index.html#handling-incoming-packets
277//! [`with_transport()`]: struct.Connection.html#method.with_transport
278//! [`poll()`]: struct.Connection.html#method.poll
279//! [`Event`]: enum.Event.html
280//! [`Error`]: enum.Error.html
281//! [`send_request()`]: struct.Connection.html#method.send_response
282//! [`send_response()`]: struct.Connection.html#method.send_response
283//! [`send_body()`]: struct.Connection.html#method.send_body
284
285use std::collections::HashSet;
286use std::collections::VecDeque;
287
288#[cfg(feature = "sfv")]
289use std::convert::TryFrom;
290use std::fmt;
291use std::fmt::Write;
292
293#[cfg(feature = "qlog")]
294use qlog::events::http3::FrameCreated;
295#[cfg(feature = "qlog")]
296use qlog::events::http3::FrameParsed;
297#[cfg(feature = "qlog")]
298use qlog::events::http3::Http3EventType;
299#[cfg(feature = "qlog")]
300use qlog::events::http3::Http3Frame;
301#[cfg(feature = "qlog")]
302use qlog::events::http3::Initiator;
303#[cfg(feature = "qlog")]
304use qlog::events::http3::StreamType;
305#[cfg(feature = "qlog")]
306use qlog::events::http3::StreamTypeSet;
307#[cfg(feature = "qlog")]
308use qlog::events::EventData;
309#[cfg(feature = "qlog")]
310use qlog::events::EventImportance;
311#[cfg(feature = "qlog")]
312use qlog::events::EventType;
313
314use crate::buffers::BufFactory;
315use crate::BufSplit;
316
317/// List of ALPN tokens of supported HTTP/3 versions.
318///
319/// This can be passed directly to the [`Config::set_application_protos()`]
320/// method when implementing HTTP/3 applications.
321///
322/// [`Config::set_application_protos()`]:
323/// ../struct.Config.html#method.set_application_protos
324pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
325
326// The offset used when converting HTTP/3 urgency to quiche urgency.
327const PRIORITY_URGENCY_OFFSET: u8 = 124;
328
329// Parameter values as specified in [Extensible Priorities].
330//
331// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
332const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
333const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
334const PRIORITY_URGENCY_DEFAULT: u8 = 3;
335const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
336
337#[cfg(feature = "qlog")]
338const QLOG_FRAME_CREATED: EventType =
339    EventType::Http3EventType(Http3EventType::FrameCreated);
340#[cfg(feature = "qlog")]
341const QLOG_FRAME_PARSED: EventType =
342    EventType::Http3EventType(Http3EventType::FrameParsed);
343#[cfg(feature = "qlog")]
344const QLOG_STREAM_TYPE_SET: EventType =
345    EventType::Http3EventType(Http3EventType::StreamTypeSet);
346
347/// A specialized [`Result`] type for quiche HTTP/3 operations.
348///
349/// This type is used throughout quiche's HTTP/3 public API for any operation
350/// that can produce an error.
351///
352/// [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
353pub type Result<T> = std::result::Result<T, Error>;
354
355/// An HTTP/3 error.
356#[derive(Clone, Copy, Debug, PartialEq, Eq)]
357pub enum Error {
358    /// There is no error or no work to do
359    Done,
360
361    /// The provided buffer is too short.
362    BufferTooShort,
363
364    /// Internal error in the HTTP/3 stack.
365    InternalError,
366
367    /// Endpoint detected that the peer is exhibiting behavior that causes.
368    /// excessive load.
369    ExcessiveLoad,
370
371    /// Stream ID or Push ID greater that current maximum was
372    /// used incorrectly, such as exceeding a limit, reducing a limit,
373    /// or being reused.
374    IdError,
375
376    /// The endpoint detected that its peer created a stream that it will not
377    /// accept.
378    StreamCreationError,
379
380    /// A required critical stream was closed.
381    ClosedCriticalStream,
382
383    /// No SETTINGS frame at beginning of control stream.
384    MissingSettings,
385
386    /// A frame was received which is not permitted in the current state.
387    FrameUnexpected,
388
389    /// Frame violated layout or size rules.
390    FrameError,
391
392    /// QPACK Header block decompression failure.
393    QpackDecompressionFailed,
394
395    /// Error originated from the transport layer.
396    TransportError(crate::Error),
397
398    /// The underlying QUIC stream (or connection) doesn't have enough capacity
399    /// for the operation to complete. The application should retry later on.
400    StreamBlocked,
401
402    /// Error in the payload of a SETTINGS frame.
403    SettingsError,
404
405    /// Server rejected request.
406    RequestRejected,
407
408    /// Request or its response cancelled.
409    RequestCancelled,
410
411    /// Client's request stream terminated without containing a full-formed
412    /// request.
413    RequestIncomplete,
414
415    /// An HTTP message was malformed and cannot be processed.
416    MessageError,
417
418    /// The TCP connection established in response to a CONNECT request was
419    /// reset or abnormally closed.
420    ConnectError,
421
422    /// The requested operation cannot be served over HTTP/3. Peer should retry
423    /// over HTTP/1.1.
424    VersionFallback,
425}
426
427/// HTTP/3 error codes sent on the wire.
428///
429/// As defined in [RFC9114](https://www.rfc-editor.org/rfc/rfc9114.html#http-error-codes).
430#[derive(Copy, Clone, Debug, Eq, PartialEq)]
431pub enum WireErrorCode {
432    /// No error. This is used when the connection or stream needs to be closed,
433    /// but there is no error to signal.
434    NoError              = 0x100,
435    /// Peer violated protocol requirements in a way that does not match a more
436    /// specific error code or endpoint declines to use the more specific
437    /// error code.
438    GeneralProtocolError = 0x101,
439    /// An internal error has occurred in the HTTP stack.
440    InternalError        = 0x102,
441    /// The endpoint detected that its peer created a stream that it will not
442    /// accept.
443    StreamCreationError  = 0x103,
444    /// A stream required by the HTTP/3 connection was closed or reset.
445    ClosedCriticalStream = 0x104,
446    /// A frame was received that was not permitted in the current state or on
447    /// the current stream.
448    FrameUnexpected      = 0x105,
449    /// A frame that fails to satisfy layout requirements or with an invalid
450    /// size was received.
451    FrameError           = 0x106,
452    /// The endpoint detected that its peer is exhibiting a behavior that might
453    /// be generating excessive load.
454    ExcessiveLoad        = 0x107,
455    /// A stream ID or push ID was used incorrectly, such as exceeding a limit,
456    /// reducing a limit, or being reused.
457    IdError              = 0x108,
458    /// An endpoint detected an error in the payload of a SETTINGS frame.
459    SettingsError        = 0x109,
460    /// No SETTINGS frame was received at the beginning of the control stream.
461    MissingSettings      = 0x10a,
462    /// A server rejected a request without performing any application
463    /// processing.
464    RequestRejected      = 0x10b,
465    /// The request or its response (including pushed response) is cancelled.
466    RequestCancelled     = 0x10c,
467    /// The client's stream terminated without containing a fully formed
468    /// request.
469    RequestIncomplete    = 0x10d,
470    /// An HTTP message was malformed and cannot be processed.
471    MessageError         = 0x10e,
472    /// The TCP connection established in response to a CONNECT request was
473    /// reset or abnormally closed.
474    ConnectError         = 0x10f,
475    /// The requested operation cannot be served over HTTP/3. The peer should
476    /// retry over HTTP/1.1.
477    VersionFallback      = 0x110,
478}
479
480impl Error {
481    fn to_wire(self) -> u64 {
482        match self {
483            Error::Done => WireErrorCode::NoError as u64,
484            Error::InternalError => WireErrorCode::InternalError as u64,
485            Error::StreamCreationError =>
486                WireErrorCode::StreamCreationError as u64,
487            Error::ClosedCriticalStream =>
488                WireErrorCode::ClosedCriticalStream as u64,
489            Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
490            Error::FrameError => WireErrorCode::FrameError as u64,
491            Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
492            Error::IdError => WireErrorCode::IdError as u64,
493            Error::MissingSettings => WireErrorCode::MissingSettings as u64,
494            Error::QpackDecompressionFailed => 0x200,
495            Error::BufferTooShort => 0x999,
496            Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
497            Error::SettingsError => WireErrorCode::SettingsError as u64,
498            Error::RequestRejected => WireErrorCode::RequestRejected as u64,
499            Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
500            Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
501            Error::MessageError => WireErrorCode::MessageError as u64,
502            Error::ConnectError => WireErrorCode::ConnectError as u64,
503            Error::VersionFallback => WireErrorCode::VersionFallback as u64,
504        }
505    }
506
507    #[cfg(feature = "ffi")]
508    fn to_c(self) -> libc::ssize_t {
509        match self {
510            Error::Done => -1,
511            Error::BufferTooShort => -2,
512            Error::InternalError => -3,
513            Error::ExcessiveLoad => -4,
514            Error::IdError => -5,
515            Error::StreamCreationError => -6,
516            Error::ClosedCriticalStream => -7,
517            Error::MissingSettings => -8,
518            Error::FrameUnexpected => -9,
519            Error::FrameError => -10,
520            Error::QpackDecompressionFailed => -11,
521            // -12 was previously used for TransportError, skip it
522            Error::StreamBlocked => -13,
523            Error::SettingsError => -14,
524            Error::RequestRejected => -15,
525            Error::RequestCancelled => -16,
526            Error::RequestIncomplete => -17,
527            Error::MessageError => -18,
528            Error::ConnectError => -19,
529            Error::VersionFallback => -20,
530
531            Error::TransportError(quic_error) => quic_error.to_c() - 1000,
532        }
533    }
534}
535
536impl fmt::Display for Error {
537    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
538        write!(f, "{self:?}")
539    }
540}
541
542impl std::error::Error for Error {
543    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
544        None
545    }
546}
547
548impl From<super::Error> for Error {
549    fn from(err: super::Error) -> Self {
550        match err {
551            super::Error::Done => Error::Done,
552
553            _ => Error::TransportError(err),
554        }
555    }
556}
557
558impl From<octets::BufferTooShortError> for Error {
559    fn from(_err: octets::BufferTooShortError) -> Self {
560        Error::BufferTooShort
561    }
562}
563
564/// An HTTP/3 configuration.
565pub struct Config {
566    max_field_section_size: Option<u64>,
567    qpack_max_table_capacity: Option<u64>,
568    qpack_blocked_streams: Option<u64>,
569    connect_protocol_enabled: Option<u64>,
570    /// additional settings are settings that are not part of the H3
571    /// settings explicitly handled above
572    additional_settings: Option<Vec<(u64, u64)>>,
573}
574
575impl Config {
576    /// Creates a new configuration object with default settings.
577    pub const fn new() -> Result<Config> {
578        Ok(Config {
579            max_field_section_size: None,
580            qpack_max_table_capacity: None,
581            qpack_blocked_streams: None,
582            connect_protocol_enabled: None,
583            additional_settings: None,
584        })
585    }
586
587    /// Sets the `SETTINGS_MAX_FIELD_SECTION_SIZE` setting.
588    ///
589    /// By default no limit is enforced. When a request whose headers exceed
590    /// the limit set by the application is received, the call to the [`poll()`]
591    /// method will return the [`Error::ExcessiveLoad`] error, and the
592    /// connection will be closed.
593    ///
594    /// [`poll()`]: struct.Connection.html#method.poll
595    /// [`Error::ExcessiveLoad`]: enum.Error.html#variant.ExcessiveLoad
596    pub fn set_max_field_section_size(&mut self, v: u64) {
597        self.max_field_section_size = Some(v);
598    }
599
600    /// Sets the `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting.
601    ///
602    /// The default value is `0`.
603    pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
604        self.qpack_max_table_capacity = Some(v);
605    }
606
607    /// Sets the `SETTINGS_QPACK_BLOCKED_STREAMS` setting.
608    ///
609    /// The default value is `0`.
610    pub fn set_qpack_blocked_streams(&mut self, v: u64) {
611        self.qpack_blocked_streams = Some(v);
612    }
613
614    /// Sets or omits the `SETTINGS_ENABLE_CONNECT_PROTOCOL` setting.
615    ///
616    /// The default value is `false`.
617    pub fn enable_extended_connect(&mut self, enabled: bool) {
618        if enabled {
619            self.connect_protocol_enabled = Some(1);
620        } else {
621            self.connect_protocol_enabled = None;
622        }
623    }
624
625    /// Sets additional HTTP/3 settings.
626    ///
627    /// The default value is no additional settings.
628    /// The `additional_settings` parameter must not the following
629    /// settings as they are already handled by this library:
630    ///
631    /// - SETTINGS_QPACK_MAX_TABLE_CAPACITY
632    /// - SETTINGS_MAX_FIELD_SECTION_SIZE
633    /// - SETTINGS_QPACK_BLOCKED_STREAMS
634    /// - SETTINGS_ENABLE_CONNECT_PROTOCOL
635    /// - SETTINGS_H3_DATAGRAM
636    ///
637    /// If such a setting is present in the `additional_settings`,
638    /// the method will return the [`Error::SettingsError`] error.
639    ///
640    /// If a setting identifier is present twice in `additional_settings`,
641    /// the method will return the [`Error::SettingsError`] error.
642    ///
643    /// [`Error::SettingsError`]: enum.Error.html#variant.SettingsError
644    pub fn set_additional_settings(
645        &mut self, additional_settings: Vec<(u64, u64)>,
646    ) -> Result<()> {
647        let explicit_quiche_settings = HashSet::from([
648            frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
649            frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
650            frame::SETTINGS_QPACK_BLOCKED_STREAMS,
651            frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
652            frame::SETTINGS_H3_DATAGRAM,
653            frame::SETTINGS_H3_DATAGRAM_00,
654        ]);
655
656        let dedup_settings: HashSet<u64> =
657            additional_settings.iter().map(|(key, _)| *key).collect();
658
659        if dedup_settings.len() != additional_settings.len() ||
660            !explicit_quiche_settings.is_disjoint(&dedup_settings)
661        {
662            return Err(Error::SettingsError);
663        }
664        self.additional_settings = Some(additional_settings);
665        Ok(())
666    }
667}
668
669/// A trait for types with associated string name and value.
670pub trait NameValue {
671    /// Returns the object's name.
672    fn name(&self) -> &[u8];
673
674    /// Returns the object's value.
675    fn value(&self) -> &[u8];
676}
677
678impl<N, V> NameValue for (N, V)
679where
680    N: AsRef<[u8]>,
681    V: AsRef<[u8]>,
682{
683    fn name(&self) -> &[u8] {
684        self.0.as_ref()
685    }
686
687    fn value(&self) -> &[u8] {
688        self.1.as_ref()
689    }
690}
691
692/// An owned name-value pair representing a raw HTTP header.
693#[derive(Clone, PartialEq, Eq)]
694pub struct Header(Vec<u8>, Vec<u8>);
695
696fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
697    match std::str::from_utf8(hdr) {
698        Ok(s) => f.write_str(&s.escape_default().to_string()),
699        Err(_) => write!(f, "{hdr:?}"),
700    }
701}
702
703impl fmt::Debug for Header {
704    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
705        f.write_char('"')?;
706        try_print_as_readable(&self.0, f)?;
707        f.write_str(": ")?;
708        try_print_as_readable(&self.1, f)?;
709        f.write_char('"')
710    }
711}
712
713impl Header {
714    /// Creates a new header.
715    ///
716    /// Both `name` and `value` will be cloned.
717    pub fn new(name: &[u8], value: &[u8]) -> Self {
718        Self(name.to_vec(), value.to_vec())
719    }
720}
721
722impl NameValue for Header {
723    fn name(&self) -> &[u8] {
724        &self.0
725    }
726
727    fn value(&self) -> &[u8] {
728        &self.1
729    }
730}
731
732/// A non-owned name-value pair representing a raw HTTP header.
733#[derive(Clone, Debug, PartialEq, Eq)]
734pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
735
736impl<'a> HeaderRef<'a> {
737    /// Creates a new header.
738    pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
739        Self(name, value)
740    }
741}
742
743impl NameValue for HeaderRef<'_> {
744    fn name(&self) -> &[u8] {
745        self.0
746    }
747
748    fn value(&self) -> &[u8] {
749        self.1
750    }
751}
752
753/// An HTTP/3 connection event.
754#[derive(Clone, Debug, PartialEq, Eq)]
755pub enum Event {
756    /// Request/response headers were received.
757    Headers {
758        /// The list of received header fields. The application should validate
759        /// pseudo-headers and headers.
760        list: Vec<Header>,
761
762        /// Whether more frames will follow the headers on the stream.
763        more_frames: bool,
764    },
765
766    /// Data was received.
767    ///
768    /// This indicates that the application can use the [`recv_body()`] method
769    /// to retrieve the data from the stream.
770    ///
771    /// Note that [`recv_body()`] will need to be called repeatedly until the
772    /// [`Done`] value is returned, as the event will not be re-armed until all
773    /// buffered data is read.
774    ///
775    /// [`recv_body()`]: struct.Connection.html#method.recv_body
776    /// [`Done`]: enum.Error.html#variant.Done
777    Data,
778
779    /// Stream was closed,
780    Finished,
781
782    /// Stream was reset.
783    ///
784    /// The associated data represents the error code sent by the peer.
785    Reset(u64),
786
787    /// PRIORITY_UPDATE was received.
788    ///
789    /// This indicates that the application can use the
790    /// [`take_last_priority_update()`] method to take the last received
791    /// PRIORITY_UPDATE for a specified stream.
792    ///
793    /// This event is triggered once per stream until the last PRIORITY_UPDATE
794    /// is taken. It is recommended that applications defer taking the
795    /// PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
796    ///
797    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
798    /// [`poll()`]: struct.Connection.html#method.poll
799    /// [`Done`]: enum.Error.html#variant.Done
800    PriorityUpdate,
801
802    /// GOAWAY was received.
803    GoAway,
804}
805
806/// Extensible Priorities parameters.
807///
808/// The `TryFrom` trait supports constructing this object from the serialized
809/// Structured Fields Dictionary field value. I.e, use `TryFrom` to parse the
810/// value of a Priority header field or a PRIORITY_UPDATE frame. Using this
811/// trait requires the `sfv` feature to be enabled.
812#[derive(Clone, Copy, Debug, PartialEq, Eq)]
813#[repr(C)]
814pub struct Priority {
815    urgency: u8,
816    incremental: bool,
817}
818
819impl Default for Priority {
820    fn default() -> Self {
821        Priority {
822            urgency: PRIORITY_URGENCY_DEFAULT,
823            incremental: PRIORITY_INCREMENTAL_DEFAULT,
824        }
825    }
826}
827
828impl Priority {
829    /// Creates a new Priority.
830    pub const fn new(urgency: u8, incremental: bool) -> Self {
831        Priority {
832            urgency,
833            incremental,
834        }
835    }
836}
837
838#[cfg(feature = "sfv")]
839#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
840impl TryFrom<&[u8]> for Priority {
841    type Error = Error;
842
843    /// Try to parse an Extensible Priority field value.
844    ///
845    /// The field value is expected to be a Structured Fields Dictionary; see
846    /// [Extensible Priorities].
847    ///
848    /// If the `u` or `i` fields are contained with correct types, a constructed
849    /// Priority object is returned. Note that urgency values outside of valid
850    /// range (0 through 7) are clamped to 7.
851    ///
852    /// If the `u` or `i` fields are contained with the wrong types,
853    /// Error::Done is returned.
854    ///
855    /// Omitted parameters will yield default values.
856    ///
857    /// [Extensible Priorities]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
858    fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
859        let dict = match sfv::Parser::parse_dictionary(value) {
860            Ok(v) => v,
861
862            Err(_) => return Err(Error::Done),
863        };
864
865        let urgency = match dict.get("u") {
866            // If there is a u parameter, try to read it as an Item of type
867            // Integer. If the value out of the spec's allowed range
868            // (0 through 7), that's an error so set it to the upper
869            // bound (lowest priority) to avoid interference with
870            // other streams.
871            Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
872                Some(v) => {
873                    if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
874                        PRIORITY_URGENCY_UPPER_BOUND as i64)
875                        .contains(&v)
876                    {
877                        PRIORITY_URGENCY_UPPER_BOUND
878                    } else {
879                        v as u8
880                    }
881                },
882
883                None => return Err(Error::Done),
884            },
885
886            Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
887
888            // Omitted so use default value.
889            None => PRIORITY_URGENCY_DEFAULT,
890        };
891
892        let incremental = match dict.get("i") {
893            Some(sfv::ListEntry::Item(item)) =>
894                item.bare_item.as_bool().ok_or(Error::Done)?,
895
896            // Omitted so use default value.
897            _ => false,
898        };
899
900        Ok(Priority::new(urgency, incremental))
901    }
902}
903
904struct ConnectionSettings {
905    pub max_field_section_size: Option<u64>,
906    pub qpack_max_table_capacity: Option<u64>,
907    pub qpack_blocked_streams: Option<u64>,
908    pub connect_protocol_enabled: Option<u64>,
909    pub h3_datagram: Option<u64>,
910    pub additional_settings: Option<Vec<(u64, u64)>>,
911    pub raw: Option<Vec<(u64, u64)>>,
912}
913
914#[derive(Default)]
915struct QpackStreams {
916    pub encoder_stream_id: Option<u64>,
917    pub encoder_stream_bytes: u64,
918    pub decoder_stream_id: Option<u64>,
919    pub decoder_stream_bytes: u64,
920}
921
922/// Statistics about the connection.
923///
924/// A connection's statistics can be collected using the [`stats()`] method.
925///
926/// [`stats()`]: struct.Connection.html#method.stats
927#[derive(Clone, Default)]
928pub struct Stats {
929    /// The number of bytes received on the QPACK encoder stream.
930    pub qpack_encoder_stream_recv_bytes: u64,
931    /// The number of bytes received on the QPACK decoder stream.
932    pub qpack_decoder_stream_recv_bytes: u64,
933}
934
935fn close_conn_critical_stream<F: BufFactory>(
936    conn: &mut super::Connection<F>,
937) -> Result<()> {
938    conn.close(
939        true,
940        Error::ClosedCriticalStream.to_wire(),
941        b"Critical stream closed.",
942    )?;
943
944    Err(Error::ClosedCriticalStream)
945}
946
947fn close_conn_if_critical_stream_finished<F: BufFactory>(
948    conn: &mut super::Connection<F>, stream_id: u64,
949) -> Result<()> {
950    if conn.stream_finished(stream_id) {
951        close_conn_critical_stream(conn)?;
952    }
953
954    Ok(())
955}
956
957/// An HTTP/3 connection.
958pub struct Connection {
959    is_server: bool,
960
961    next_request_stream_id: u64,
962    next_uni_stream_id: u64,
963
964    streams: crate::stream::StreamIdHashMap<stream::Stream>,
965
966    local_settings: ConnectionSettings,
967    peer_settings: ConnectionSettings,
968
969    control_stream_id: Option<u64>,
970    peer_control_stream_id: Option<u64>,
971
972    qpack_encoder: qpack::Encoder,
973    qpack_decoder: qpack::Decoder,
974
975    local_qpack_streams: QpackStreams,
976    peer_qpack_streams: QpackStreams,
977
978    max_push_id: u64,
979
980    finished_streams: VecDeque<u64>,
981
982    frames_greased: bool,
983
984    local_goaway_id: Option<u64>,
985    peer_goaway_id: Option<u64>,
986}
987
988impl Connection {
989    fn new(
990        config: &Config, is_server: bool, enable_dgram: bool,
991    ) -> Result<Connection> {
992        let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
993        let h3_datagram = if enable_dgram { Some(1) } else { None };
994
995        Ok(Connection {
996            is_server,
997
998            next_request_stream_id: 0,
999
1000            next_uni_stream_id: initial_uni_stream_id,
1001
1002            streams: Default::default(),
1003
1004            local_settings: ConnectionSettings {
1005                max_field_section_size: config.max_field_section_size,
1006                qpack_max_table_capacity: config.qpack_max_table_capacity,
1007                qpack_blocked_streams: config.qpack_blocked_streams,
1008                connect_protocol_enabled: config.connect_protocol_enabled,
1009                h3_datagram,
1010                additional_settings: config.additional_settings.clone(),
1011                raw: Default::default(),
1012            },
1013
1014            peer_settings: ConnectionSettings {
1015                max_field_section_size: None,
1016                qpack_max_table_capacity: None,
1017                qpack_blocked_streams: None,
1018                h3_datagram: None,
1019                connect_protocol_enabled: None,
1020                additional_settings: Default::default(),
1021                raw: Default::default(),
1022            },
1023
1024            control_stream_id: None,
1025            peer_control_stream_id: None,
1026
1027            qpack_encoder: qpack::Encoder::new(),
1028            qpack_decoder: qpack::Decoder::new(),
1029
1030            local_qpack_streams: Default::default(),
1031            peer_qpack_streams: Default::default(),
1032
1033            max_push_id: 0,
1034
1035            finished_streams: VecDeque::new(),
1036
1037            frames_greased: false,
1038
1039            local_goaway_id: None,
1040            peer_goaway_id: None,
1041        })
1042    }
1043
1044    /// Creates a new HTTP/3 connection using the provided QUIC connection.
1045    ///
1046    /// This will also initiate the HTTP/3 handshake with the peer by opening
1047    /// all control streams (including QPACK) and sending the local settings.
1048    ///
1049    /// On success the new connection is returned.
1050    ///
1051    /// The [`StreamLimit`] error is returned when the HTTP/3 control stream
1052    /// cannot be created due to stream limits.
1053    ///
1054    /// The [`InternalError`] error is returned when either the underlying QUIC
1055    /// connection is not in a suitable state, or the HTTP/3 control stream
1056    /// cannot be created due to flow control limits.
1057    ///
1058    /// [`StreamLimit`]: ../enum.Error.html#variant.StreamLimit
1059    /// [`InternalError`]: ../enum.Error.html#variant.InternalError
1060    pub fn with_transport<F: BufFactory>(
1061        conn: &mut super::Connection<F>, config: &Config,
1062    ) -> Result<Connection> {
1063        let is_client = !conn.is_server;
1064        if is_client && !(conn.is_established() || conn.is_in_early_data()) {
1065            trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
1066            return Err(Error::InternalError);
1067        }
1068
1069        let mut http3_conn =
1070            Connection::new(config, conn.is_server, conn.dgram_enabled())?;
1071
1072        match http3_conn.send_settings(conn) {
1073            Ok(_) => (),
1074
1075            Err(e) => {
1076                conn.close(true, e.to_wire(), b"Error opening control stream")?;
1077                return Err(e);
1078            },
1079        };
1080
1081        // Try opening QPACK streams, but ignore errors if it fails since we
1082        // don't need them right now.
1083        http3_conn.open_qpack_encoder_stream(conn).ok();
1084        http3_conn.open_qpack_decoder_stream(conn).ok();
1085
1086        if conn.grease {
1087            // Try opening a GREASE stream, but ignore errors since it's not
1088            // critical.
1089            http3_conn.open_grease_stream(conn).ok();
1090        }
1091
1092        Ok(http3_conn)
1093    }
1094
1095    /// Sends an HTTP/3 request.
1096    ///
1097    /// The request is encoded from the provided list of headers without a
1098    /// body, and sent on a newly allocated stream. To include a body,
1099    /// set `fin` as `false` and subsequently call [`send_body()`] with the
1100    /// same `conn` and the `stream_id` returned from this method.
1101    ///
1102    /// On success the newly allocated stream ID is returned.
1103    ///
1104    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1105    /// doesn't have enough capacity for the operation to complete. When this
1106    /// happens the application should retry the operation once the stream is
1107    /// reported as writable again.
1108    ///
1109    /// [`send_body()`]: struct.Connection.html#method.send_body
1110    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1111    pub fn send_request<T: NameValue, F: BufFactory>(
1112        &mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
1113    ) -> Result<u64> {
1114        // If we received a GOAWAY from the peer, MUST NOT initiate new
1115        // requests.
1116        if self.peer_goaway_id.is_some() {
1117            return Err(Error::FrameUnexpected);
1118        }
1119
1120        let stream_id = self.next_request_stream_id;
1121
1122        self.streams
1123            .insert(stream_id, <stream::Stream>::new(stream_id, true));
1124
1125        // The underlying QUIC stream does not exist yet, so calls to e.g.
1126        // stream_capacity() will fail. By writing a 0-length buffer, we force
1127        // the creation of the QUIC stream state, without actually writing
1128        // anything.
1129        if let Err(e) = conn.stream_send(stream_id, b"", false) {
1130            self.streams.remove(&stream_id);
1131
1132            if e == super::Error::Done {
1133                return Err(Error::StreamBlocked);
1134            }
1135
1136            return Err(e.into());
1137        };
1138
1139        self.send_headers(conn, stream_id, headers, fin)?;
1140
1141        // To avoid skipping stream IDs, we only calculate the next available
1142        // stream ID when a request has been successfully buffered.
1143        self.next_request_stream_id = self
1144            .next_request_stream_id
1145            .checked_add(4)
1146            .ok_or(Error::IdError)?;
1147
1148        Ok(stream_id)
1149    }
1150
1151    /// Sends an HTTP/3 response on the specified stream with default priority.
1152    ///
1153    /// This method sends the provided `headers` as a single initial response
1154    /// without a body.
1155    ///
1156    /// To send a non-final 1xx, then a final 200+ without body:
1157    ///   * send_response() with `fin` set to `false`.
1158    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1159    ///     `stream_id` value.
1160    ///
1161    /// To send a non-final 1xx, then a final 200+ with body:
1162    ///   * send_response() with `fin` set to `false`.
1163    ///   * [`send_additional_headers()`] with fin set to `false` and same
1164    ///     `stream_id` value.
1165    ///   * [`send_body()`] with same `stream_id`.
1166    ///
1167    /// To send a final 200+ with body:
1168    ///   * send_response() with `fin` set to `false`.
1169    ///   * [`send_body()`] with same `stream_id`.
1170    ///
1171    /// Additional headers can only be sent during certain phases of an HTTP/3
1172    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1173    /// error is returned if this method, or [`send_response_with_priority()`],
1174    /// are called multiple times with the same `stream_id` value.
1175    ///
1176    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1177    /// doesn't have enough capacity for the operation to complete. When this
1178    /// happens the application should retry the operation once the stream is
1179    /// reported as writable again.
1180    ///
1181    /// [`send_body()`]: struct.Connection.html#method.send_body
1182    /// [`send_additional_headers()`]:
1183    ///     struct.Connection.html#method.send_additional_headers
1184    /// [`send_response_with_priority()`]:
1185    ///     struct.Connection.html#method.send_response_with_priority
1186    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1187    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1188    pub fn send_response<T: NameValue, F: BufFactory>(
1189        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1190        headers: &[T], fin: bool,
1191    ) -> Result<()> {
1192        let priority = Default::default();
1193
1194        self.send_response_with_priority(
1195            conn, stream_id, headers, &priority, fin,
1196        )?;
1197
1198        Ok(())
1199    }
1200
1201    /// Sends an HTTP/3 response on the specified stream with specified
1202    /// priority.
1203    ///
1204    /// This method sends the provided `headers` as a single initial response
1205    /// without a body.
1206    ///
1207    /// To send a non-final 1xx, then a final 200+ without body:
1208    ///   * send_response_with_priority() with `fin` set to `false`.
1209    ///   * [`send_additional_headers()`] with fin set to `true` using the same
1210    ///     `stream_id` value.
1211    ///
1212    /// To send a non-final 1xx, then a final 200+ with body:
1213    ///   * send_response_with_priority() with `fin` set to `false`.
1214    ///   * [`send_additional_headers()`] with fin set to `false` and same
1215    ///     `stream_id` value.
1216    ///   * [`send_body()`] with same `stream_id`.
1217    ///
1218    /// To send a final 200+ with body:
1219    ///   * send_response_with_priority() with `fin` set to `false`.
1220    ///   * [`send_body()`] with same `stream_id`.
1221    ///
1222    /// The `priority` parameter represents [Extensible Priority]
1223    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1224    /// to 7.
1225    ///
1226    /// Additional headers can only be sent during certain phases of an HTTP/3
1227    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1228    /// error is returned if this method, or [`send_response()`],
1229    /// are called multiple times with the same `stream_id` value.
1230    ///
1231    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1232    /// doesn't have enough capacity for the operation to complete. When this
1233    /// happens the application should retry the operation once the stream is
1234    /// reported as writable again.
1235    ///
1236    /// [`send_body()`]: struct.Connection.html#method.send_body
1237    /// [`send_additional_headers()`]:
1238    ///     struct.Connection.html#method.send_additional_headers
1239    /// [`send_response()`]:
1240    ///     struct.Connection.html#method.send_response
1241    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1242    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1243    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1244    pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
1245        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1246        headers: &[T], priority: &Priority, fin: bool,
1247    ) -> Result<()> {
1248        match self.streams.get(&stream_id) {
1249            Some(s) => {
1250                // Only one initial HEADERS allowed.
1251                if s.local_initialized() {
1252                    return Err(Error::FrameUnexpected);
1253                }
1254
1255                s
1256            },
1257
1258            None => return Err(Error::FrameUnexpected),
1259        };
1260
1261        self.send_headers(conn, stream_id, headers, fin)?;
1262
1263        // Clamp and shift urgency into quiche-priority space
1264        let urgency = priority
1265            .urgency
1266            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1267            PRIORITY_URGENCY_OFFSET;
1268
1269        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1270
1271        Ok(())
1272    }
1273
1274    /// Sends additional HTTP/3 headers.
1275    ///
1276    /// After the initial request or response headers have been sent, using
1277    /// [`send_request()`] or [`send_response()`] respectively, this method can
1278    /// be used send an additional HEADERS frame. For example, to send a single
1279    /// instance of trailers after a request with a body, or to issue another
1280    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1281    /// a preceding 1xx.
1282    ///
1283    /// Additional headers can only be sent during certain phases of an HTTP/3
1284    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1285    /// error is returned when this method is called during the wrong phase,
1286    /// such as before initial headers have been sent, or if trailers have
1287    /// already been sent.
1288    ///
1289    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1290    /// doesn't have enough capacity for the operation to complete. When this
1291    /// happens the application should retry the operation once the stream is
1292    /// reported as writable again.
1293    ///
1294    /// [`send_request()`]: struct.Connection.html#method.send_request
1295    /// [`send_response()`]: struct.Connection.html#method.send_response
1296    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1297    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1298    /// [Section 4.1 of RFC 9114]:
1299    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1300    pub fn send_additional_headers<T: NameValue, F: BufFactory>(
1301        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1302        headers: &[T], is_trailer_section: bool, fin: bool,
1303    ) -> Result<()> {
1304        // Clients can only send trailer headers.
1305        if !self.is_server && !is_trailer_section {
1306            return Err(Error::FrameUnexpected);
1307        }
1308
1309        match self.streams.get(&stream_id) {
1310            Some(s) => {
1311                // Initial HEADERS must have been sent.
1312                if !s.local_initialized() {
1313                    return Err(Error::FrameUnexpected);
1314                }
1315
1316                // Only one trailing HEADERS allowed.
1317                if s.trailers_sent() {
1318                    return Err(Error::FrameUnexpected);
1319                }
1320
1321                s
1322            },
1323
1324            None => return Err(Error::FrameUnexpected),
1325        };
1326
1327        self.send_headers(conn, stream_id, headers, fin)?;
1328
1329        if is_trailer_section {
1330            // send_headers() might have tidied the stream away, so we need to
1331            // check again.
1332            if let Some(s) = self.streams.get_mut(&stream_id) {
1333                s.mark_trailers_sent();
1334            }
1335        }
1336
1337        Ok(())
1338    }
1339
1340    /// Sends additional HTTP/3 headers with specified priority.
1341    ///
1342    /// After the initial request or response headers have been sent, using
1343    /// [`send_request()`] or [`send_response()`] respectively, this method can
1344    /// be used send an additional HEADERS frame. For example, to send a single
1345    /// instance of trailers after a request with a body, or to issue another
1346    /// non-final 1xx after a preceding 1xx, or to issue a final response after
1347    /// a preceding 1xx.
1348    ///
1349    /// The `priority` parameter represents [Extensible Priority]
1350    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1351    /// to 7.
1352    ///
1353    /// Additional headers can only be sent during certain phases of an HTTP/3
1354    /// message exchange, see [Section 4.1 of RFC 9114]. The [`FrameUnexpected`]
1355    /// error is returned when this method is called during the wrong phase,
1356    /// such as before initial headers have been sent, or if trailers have
1357    /// already been sent.
1358    ///
1359    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1360    /// doesn't have enough capacity for the operation to complete. When this
1361    /// happens the application should retry the operation once the stream is
1362    /// reported as writable again.
1363    ///
1364    /// [`send_request()`]: struct.Connection.html#method.send_request
1365    /// [`send_response()`]: struct.Connection.html#method.send_response
1366    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1367    /// [`FrameUnexpected`]: enum.Error.html#variant.FrameUnexpected
1368    /// [Section 4.1 of RFC 9114]:
1369    ///     https://www.rfc-editor.org/rfc/rfc9114.html#section-4.1.
1370    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1371    pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
1372        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1373        headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
1374    ) -> Result<()> {
1375        self.send_additional_headers(
1376            conn,
1377            stream_id,
1378            headers,
1379            is_trailer_section,
1380            fin,
1381        )?;
1382
1383        // Clamp and shift urgency into quiche-priority space
1384        let urgency = priority
1385            .urgency
1386            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
1387            PRIORITY_URGENCY_OFFSET;
1388
1389        conn.stream_priority(stream_id, urgency, priority.incremental)?;
1390
1391        Ok(())
1392    }
1393
1394    fn encode_header_block<T: NameValue>(
1395        &mut self, headers: &[T],
1396    ) -> Result<Vec<u8>> {
1397        let headers_len = headers
1398            .iter()
1399            .fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
1400
1401        let mut header_block = vec![0; headers_len];
1402        let len = self
1403            .qpack_encoder
1404            .encode(headers, &mut header_block)
1405            .map_err(|_| Error::InternalError)?;
1406
1407        header_block.truncate(len);
1408
1409        Ok(header_block)
1410    }
1411
1412    fn send_headers<T: NameValue, F: BufFactory>(
1413        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1414        headers: &[T], fin: bool,
1415    ) -> Result<()> {
1416        let mut d = [42; 10];
1417        let mut b = octets::OctetsMut::with_slice(&mut d);
1418
1419        if !self.frames_greased && conn.grease {
1420            self.send_grease_frames(conn, stream_id)?;
1421            self.frames_greased = true;
1422        }
1423
1424        let header_block = self.encode_header_block(headers)?;
1425
1426        let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
1427            octets::varint_len(header_block.len() as u64);
1428
1429        // Headers need to be sent atomically, so make sure the stream has
1430        // enough capacity.
1431        match conn.stream_writable(stream_id, overhead + header_block.len()) {
1432            Ok(true) => (),
1433
1434            Ok(false) => return Err(Error::StreamBlocked),
1435
1436            Err(e) => {
1437                if conn.stream_finished(stream_id) {
1438                    self.streams.remove(&stream_id);
1439                }
1440
1441                return Err(e.into());
1442            },
1443        };
1444
1445        b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
1446        b.put_varint(header_block.len() as u64)?;
1447        let off = b.off();
1448        conn.stream_send(stream_id, &d[..off], false)?;
1449
1450        // Sending header block separately avoids unnecessary copy.
1451        conn.stream_send(stream_id, &header_block, fin)?;
1452
1453        trace!(
1454            "{} tx frm HEADERS stream={} len={} fin={}",
1455            conn.trace_id(),
1456            stream_id,
1457            header_block.len(),
1458            fin
1459        );
1460
1461        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1462            let qlog_headers = headers
1463                .iter()
1464                .map(|h| qlog::events::http3::HttpHeader {
1465                    name: Some(String::from_utf8_lossy(h.name()).into_owned()),
1466                    name_bytes: None,
1467                    value: Some(String::from_utf8_lossy(h.value()).into_owned()),
1468                    value_bytes: None,
1469                })
1470                .collect();
1471
1472            let frame = Http3Frame::Headers {
1473                headers: qlog_headers,
1474                raw: None,
1475            };
1476            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1477                stream_id,
1478                length: Some(header_block.len() as u64),
1479                frame,
1480                ..Default::default()
1481            });
1482
1483            q.add_event_data_now(ev_data).ok();
1484        });
1485
1486        if let Some(s) = self.streams.get_mut(&stream_id) {
1487            s.initialize_local();
1488        }
1489
1490        if fin && conn.stream_finished(stream_id) {
1491            self.streams.remove(&stream_id);
1492        }
1493
1494        Ok(())
1495    }
1496
1497    /// Sends an HTTP/3 body chunk on the given stream.
1498    ///
1499    /// On success the number of bytes written is returned, or [`Done`] if no
1500    /// bytes could be written (e.g. because the stream is blocked).
1501    ///
1502    /// Note that the number of written bytes returned can be lower than the
1503    /// length of the input buffer when the underlying QUIC stream doesn't have
1504    /// enough capacity for the operation to complete.
1505    ///
1506    /// When a partial write happens (including when [`Done`] is returned) the
1507    /// application should retry the operation once the stream is reported as
1508    /// writable again.
1509    ///
1510    /// [`Done`]: enum.Error.html#variant.Done
1511    pub fn send_body<F: BufFactory>(
1512        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
1513        fin: bool,
1514    ) -> Result<usize> {
1515        self.do_send_body(
1516            conn,
1517            stream_id,
1518            body,
1519            fin,
1520            |conn: &mut super::Connection<F>,
1521             header: &[u8],
1522             stream_id: u64,
1523             body: &[u8],
1524             body_len: usize,
1525             fin: bool| {
1526                conn.stream_send(stream_id, header, false)?;
1527                Ok(conn
1528                    .stream_send(stream_id, &body[..body_len], fin)
1529                    .map(|v| (v, v))?)
1530            },
1531        )
1532    }
1533
1534    /// Sends an HTTP/3 body chunk provided as a raw buffer on the given stream.
1535    ///
1536    /// If the capacity allows it the buffer will be appended to the stream's
1537    /// send queue with zero copying.
1538    ///
1539    /// On success the number of bytes written is returned, or [`Done`] if no
1540    /// bytes could be written (e.g. because the stream is blocked).
1541    ///
1542    /// Note that the number of written bytes returned can be lower than the
1543    /// length of the input buffer when the underlying QUIC stream doesn't have
1544    /// enough capacity for the operation to complete.
1545    ///
1546    /// When a partial write happens (including when [`Done`] is returned) the
1547    /// remaining (unwrittent) buffer will also be returned. The application
1548    /// should retry the operation once the stream is reported as writable
1549    /// again.
1550    ///
1551    /// [`Done`]: enum.Error.html#variant.Done
1552    pub fn send_body_zc<F>(
1553        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1554        body: &mut F::Buf, fin: bool,
1555    ) -> Result<usize>
1556    where
1557        F: BufFactory,
1558        F::Buf: BufSplit,
1559    {
1560        self.do_send_body(
1561            conn,
1562            stream_id,
1563            body,
1564            fin,
1565            |conn: &mut super::Connection<F>,
1566             header: &[u8],
1567             stream_id: u64,
1568             body: &mut F::Buf,
1569             mut body_len: usize,
1570             fin: bool| {
1571                let with_prefix = body.try_add_prefix(header);
1572                if !with_prefix {
1573                    conn.stream_send(stream_id, header, false)?;
1574                } else {
1575                    body_len += header.len();
1576                }
1577
1578                let (mut n, rem) = conn.stream_send_zc(
1579                    stream_id,
1580                    body.clone(),
1581                    Some(body_len),
1582                    fin,
1583                )?;
1584
1585                if with_prefix {
1586                    n -= header.len();
1587                }
1588
1589                if let Some(rem) = rem {
1590                    let _ = std::mem::replace(body, rem);
1591                }
1592
1593                Ok((n, n))
1594            },
1595        )
1596    }
1597
1598    fn do_send_body<F, B, R, SND>(
1599        &mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
1600        fin: bool, write_fn: SND,
1601    ) -> Result<R>
1602    where
1603        F: BufFactory,
1604        B: AsRef<[u8]>,
1605        SND: FnOnce(
1606            &mut super::Connection<F>,
1607            &[u8],
1608            u64,
1609            B,
1610            usize,
1611            bool,
1612        ) -> Result<(usize, R)>,
1613    {
1614        let mut d = [42; 10];
1615        let mut b = octets::OctetsMut::with_slice(&mut d);
1616
1617        let len = body.as_ref().len();
1618
1619        // Validate that it is sane to send data on the stream.
1620        if stream_id % 4 != 0 {
1621            return Err(Error::FrameUnexpected);
1622        }
1623
1624        match self.streams.get_mut(&stream_id) {
1625            Some(s) => {
1626                if !s.local_initialized() {
1627                    return Err(Error::FrameUnexpected);
1628                }
1629
1630                if s.trailers_sent() {
1631                    return Err(Error::FrameUnexpected);
1632                }
1633            },
1634
1635            None => {
1636                return Err(Error::FrameUnexpected);
1637            },
1638        };
1639
1640        // Avoid sending 0-length DATA frames when the fin flag is false.
1641        if len == 0 && !fin {
1642            return Err(Error::Done);
1643        }
1644
1645        let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
1646            octets::varint_len(len as u64);
1647
1648        let stream_cap = match conn.stream_capacity(stream_id) {
1649            Ok(v) => v,
1650
1651            Err(e) => {
1652                if conn.stream_finished(stream_id) {
1653                    self.streams.remove(&stream_id);
1654                }
1655
1656                return Err(e.into());
1657            },
1658        };
1659
1660        // Make sure there is enough capacity to send the DATA frame header.
1661        if stream_cap < overhead {
1662            let _ = conn.stream_writable(stream_id, overhead + 1);
1663            return Err(Error::Done);
1664        }
1665
1666        // Cap the frame payload length to the stream's capacity.
1667        let body_len = std::cmp::min(len, stream_cap - overhead);
1668
1669        // If we can't send the entire body, set the fin flag to false so the
1670        // application can try again later.
1671        let fin = if body_len != len { false } else { fin };
1672
1673        // Again, avoid sending 0-length DATA frames when the fin flag is false.
1674        if body_len == 0 && !fin {
1675            let _ = conn.stream_writable(stream_id, overhead + 1);
1676            return Err(Error::Done);
1677        }
1678
1679        b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
1680        b.put_varint(body_len as u64)?;
1681        let off = b.off();
1682
1683        // Return how many bytes were written, excluding the frame header.
1684        // Sending body separately avoids unnecessary copy.
1685        let (written, ret) =
1686            write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
1687
1688        trace!(
1689            "{} tx frm DATA stream={} len={} fin={}",
1690            conn.trace_id(),
1691            stream_id,
1692            written,
1693            fin
1694        );
1695
1696        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1697            let frame = Http3Frame::Data { raw: None };
1698            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1699                stream_id,
1700                length: Some(written as u64),
1701                frame,
1702                ..Default::default()
1703            });
1704
1705            q.add_event_data_now(ev_data).ok();
1706        });
1707
1708        if written < len {
1709            // Ensure the peer is notified that the connection or stream is
1710            // blocked when the stream's capacity is limited by flow control.
1711            //
1712            // We only need enough capacity to send a few bytes, to make sure
1713            // the stream doesn't hang due to congestion window not growing
1714            // enough.
1715            let _ = conn.stream_writable(stream_id, overhead + 1);
1716        }
1717
1718        if fin && written == len && conn.stream_finished(stream_id) {
1719            self.streams.remove(&stream_id);
1720        }
1721
1722        Ok(ret)
1723    }
1724
1725    /// Returns whether the peer enabled HTTP/3 DATAGRAM frame support.
1726    ///
1727    /// Support is signalled by the peer's SETTINGS, so this method always
1728    /// returns false until they have been processed using the [`poll()`]
1729    /// method.
1730    ///
1731    /// [`poll()`]: struct.Connection.html#method.poll
1732    pub fn dgram_enabled_by_peer<F: BufFactory>(
1733        &self, conn: &super::Connection<F>,
1734    ) -> bool {
1735        self.peer_settings.h3_datagram == Some(1) &&
1736            conn.dgram_max_writable_len().is_some()
1737    }
1738
1739    /// Returns whether the peer enabled extended CONNECT support.
1740    ///
1741    /// Support is signalled by the peer's SETTINGS, so this method always
1742    /// returns false until they have been processed using the [`poll()`]
1743    /// method.
1744    ///
1745    /// [`poll()`]: struct.Connection.html#method.poll
1746    pub fn extended_connect_enabled_by_peer(&self) -> bool {
1747        self.peer_settings.connect_protocol_enabled == Some(1)
1748    }
1749
1750    /// Reads request or response body data into the provided buffer.
1751    ///
1752    /// Applications should call this method whenever the [`poll()`] method
1753    /// returns a [`Data`] event.
1754    ///
1755    /// On success the amount of bytes read is returned, or [`Done`] if there
1756    /// is no data to read.
1757    ///
1758    /// [`poll()`]: struct.Connection.html#method.poll
1759    /// [`Data`]: enum.Event.html#variant.Data
1760    /// [`Done`]: enum.Error.html#variant.Done
1761    pub fn recv_body<F: BufFactory>(
1762        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1763        out: &mut [u8],
1764    ) -> Result<usize> {
1765        self.recv_body_buf(conn, stream_id, out)
1766    }
1767
1768    /// Reads request or response body data into the provided BufMut buffer.
1769    ///
1770    /// **NOTE**:
1771    /// The BufMut will be populated with all available data up to its capacity.
1772    /// Since some BufMut implementations, e.g., [`Vec<u8>`], dynamically
1773    /// allocate additional memory, the caller may use
1774    /// [`BufMut::limit()`] to limit the maximum amount of data that
1775    /// can be written.
1776    ///
1777    /// Applications should call this method (or [`recv_body()`]) whenever
1778    /// the [`poll()`] method returns a [`Data`] event.
1779    ///
1780    /// On success the amount of bytes read is returned, or [`Done`] if there
1781    /// is no data to read.
1782    ///
1783    /// [`BufMut::limit()`]: bytes::BufMut::limit()
1784    /// [`recv_body()`]: Self::recv_body()
1785    /// [`poll()`]: struct.Connection.html#method.poll
1786    /// [`Data`]: enum.Event.html#variant.Data
1787    /// [`Done`]: enum.Error.html#variant.Done
1788    ///
1789    /// ## Example:
1790    /// ```no_run
1791    /// # use quiche::h3;
1792    /// fn receive(
1793    ///     qconn: &mut quiche::Connection, h3conn: &mut h3::Connection,
1794    /// ) -> Result<Vec<u8>, h3::Error> {
1795    ///     use bytes::BufMut as _;
1796    ///     let mut buffer = Vec::with_capacity(2048).limit(2048);
1797    ///     let bytes = h3conn.recv_body_buf(qconn, 0, &mut buffer)?;
1798    ///     let buffer = buffer.into_inner();
1799    ///     // The vec has been filled with exactly `bytes` number of bytes
1800    ///     assert_eq!(buffer.len(), bytes);
1801    ///     Ok(buffer)
1802    /// }
1803    /// ```
1804    pub fn recv_body_buf<F: BufFactory, OUT: bytes::BufMut>(
1805        &mut self, conn: &mut super::Connection<F>, stream_id: u64, mut out: OUT,
1806    ) -> Result<usize> {
1807        let mut total = 0;
1808
1809        // Try to consume all buffered data for the stream, even across multiple
1810        // DATA frames.
1811        // Note, that even if the BufMut does not have a limit defined, we are
1812        // inherently limited by how much data is in quiche's receive buffer for
1813        // that stream, so the BufMut cannot grow unbounded.
1814        while out.has_remaining_mut() {
1815            let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
1816
1817            if stream.state() != stream::State::Data {
1818                break;
1819            }
1820
1821            let (read, fin) = match stream.try_consume_data(conn, &mut out) {
1822                Ok(v) => v,
1823
1824                Err(Error::Done) => break,
1825
1826                Err(e) => return Err(e),
1827            };
1828
1829            total += read;
1830
1831            // No more data to read, we are done.
1832            if read == 0 || fin {
1833                break;
1834            }
1835
1836            // Process incoming data from the stream. For example, if a whole
1837            // DATA frame was consumed, and another one is queued behind it,
1838            // this will ensure the additional data will also be returned to
1839            // the application.
1840            match self.process_readable_stream(conn, stream_id, false) {
1841                Ok(_) => unreachable!(),
1842
1843                Err(Error::Done) => (),
1844
1845                Err(e) => return Err(e),
1846            };
1847
1848            if conn.stream_finished(stream_id) {
1849                break;
1850            }
1851        }
1852
1853        // While body is being received, the stream is marked as finished only
1854        // when all data is read by the application.
1855        if conn.stream_finished(stream_id) {
1856            self.process_finished_stream(stream_id);
1857        }
1858
1859        if total == 0 {
1860            return Err(Error::Done);
1861        }
1862
1863        Ok(total)
1864    }
1865
1866    /// Sends a PRIORITY_UPDATE frame on the control stream with specified
1867    /// request stream ID and priority.
1868    ///
1869    /// The `priority` parameter represents [Extensible Priority]
1870    /// parameters. If the urgency is outside the range 0-7, it will be clamped
1871    /// to 7.
1872    ///
1873    /// The [`StreamBlocked`] error is returned when the underlying QUIC stream
1874    /// doesn't have enough capacity for the operation to complete. When this
1875    /// happens the application should retry the operation once the stream is
1876    /// reported as writable again.
1877    ///
1878    /// [`StreamBlocked`]: enum.Error.html#variant.StreamBlocked
1879    /// [Extensible Priority]: https://www.rfc-editor.org/rfc/rfc9218.html#section-4.
1880    pub fn send_priority_update_for_request<F: BufFactory>(
1881        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
1882        priority: &Priority,
1883    ) -> Result<()> {
1884        let mut d = [42; 20];
1885        let mut b = octets::OctetsMut::with_slice(&mut d);
1886
1887        // Validate that it is sane to send PRIORITY_UPDATE.
1888        if self.is_server {
1889            return Err(Error::FrameUnexpected);
1890        }
1891
1892        if stream_id % 4 != 0 {
1893            return Err(Error::FrameUnexpected);
1894        }
1895
1896        let control_stream_id =
1897            self.control_stream_id.ok_or(Error::FrameUnexpected)?;
1898
1899        let urgency = priority
1900            .urgency
1901            .clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
1902
1903        let mut field_value = format!("u={urgency}");
1904
1905        if priority.incremental {
1906            field_value.push_str(",i");
1907        }
1908
1909        let priority_field_value = field_value.as_bytes();
1910        let frame_payload_len =
1911            octets::varint_len(stream_id) + priority_field_value.len();
1912
1913        let overhead =
1914            octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
1915                octets::varint_len(stream_id) +
1916                octets::varint_len(frame_payload_len as u64);
1917
1918        // Make sure the control stream has enough capacity.
1919        match conn.stream_writable(
1920            control_stream_id,
1921            overhead + priority_field_value.len(),
1922        ) {
1923            Ok(true) => (),
1924
1925            Ok(false) => return Err(Error::StreamBlocked),
1926
1927            Err(e) => {
1928                return Err(e.into());
1929            },
1930        }
1931
1932        b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
1933        b.put_varint(frame_payload_len as u64)?;
1934        b.put_varint(stream_id)?;
1935        let off = b.off();
1936        conn.stream_send(control_stream_id, &d[..off], false)?;
1937
1938        // Sending field value separately avoids unnecessary copy.
1939        conn.stream_send(control_stream_id, priority_field_value, false)?;
1940
1941        trace!(
1942            "{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
1943            conn.trace_id(),
1944            stream_id,
1945            field_value,
1946        );
1947
1948        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
1949            let frame = Http3Frame::PriorityUpdate {
1950                stream_id: Some(stream_id),
1951                push_id: None,
1952                priority_field_value: field_value.clone(),
1953                raw: None,
1954            };
1955
1956            let ev_data = EventData::Http3FrameCreated(FrameCreated {
1957                stream_id,
1958                length: Some(priority_field_value.len() as u64),
1959                frame,
1960                ..Default::default()
1961            });
1962
1963            q.add_event_data_now(ev_data).ok();
1964        });
1965
1966        Ok(())
1967    }
1968
1969    /// Take the last PRIORITY_UPDATE for a prioritized element ID.
1970    ///
1971    /// When the [`poll()`] method returns a [`PriorityUpdate`] event for a
1972    /// prioritized element, the event has triggered and will not rearm until
1973    /// applications call this method. It is recommended that applications defer
1974    /// taking the PRIORITY_UPDATE until after [`poll()`] returns [`Done`].
1975    ///
1976    /// On success the Priority Field Value is returned, or [`Done`] if there is
1977    /// no PRIORITY_UPDATE to read (either because there is no value to take, or
1978    /// because the prioritized element does not exist).
1979    ///
1980    /// [`poll()`]: struct.Connection.html#method.poll
1981    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
1982    /// [`Done`]: enum.Error.html#variant.Done
1983    pub fn take_last_priority_update(
1984        &mut self, prioritized_element_id: u64,
1985    ) -> Result<Vec<u8>> {
1986        if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
1987            return stream.take_last_priority_update().ok_or(Error::Done);
1988        }
1989
1990        Err(Error::Done)
1991    }
1992
1993    /// Processes HTTP/3 data received from the peer.
1994    ///
1995    /// On success it returns an [`Event`] and an ID, or [`Done`] when there are
1996    /// no events to report.
1997    ///
1998    /// Note that all events are edge-triggered, meaning that once reported they
1999    /// will not be reported again by calling this method again, until the event
2000    /// is re-armed.
2001    ///
2002    /// The events [`Headers`], [`Data`] and [`Finished`] return a stream ID,
2003    /// which is used in methods [`recv_body()`], [`send_response()`] or
2004    /// [`send_body()`].
2005    ///
2006    /// The event [`GoAway`] returns an ID that depends on the connection role.
2007    /// A client receives the largest processed stream ID. A server receives the
2008    /// the largest permitted push ID.
2009    ///
2010    /// The event [`PriorityUpdate`] only occurs at servers. It returns a
2011    /// prioritized element ID that is used in the method
2012    /// [`take_last_priority_update()`], which rearms the event for that ID.
2013    ///
2014    /// If an error occurs while processing data, the connection is closed with
2015    /// the appropriate error code, using the transport's [`close()`] method.
2016    ///
2017    /// [`Event`]: enum.Event.html
2018    /// [`Done`]: enum.Error.html#variant.Done
2019    /// [`Headers`]: enum.Event.html#variant.Headers
2020    /// [`Data`]: enum.Event.html#variant.Data
2021    /// [`Finished`]: enum.Event.html#variant.Finished
2022    /// [`GoAway`]: enum.Event.html#variant.GoAWay
2023    /// [`PriorityUpdate`]: enum.Event.html#variant.PriorityUpdate
2024    /// [`recv_body()`]: struct.Connection.html#method.recv_body
2025    /// [`send_response()`]: struct.Connection.html#method.send_response
2026    /// [`send_body()`]: struct.Connection.html#method.send_body
2027    /// [`recv_dgram()`]: struct.Connection.html#method.recv_dgram
2028    /// [`take_last_priority_update()`]: struct.Connection.html#method.take_last_priority_update
2029    /// [`close()`]: ../struct.Connection.html#method.close
2030    pub fn poll<F: BufFactory>(
2031        &mut self, conn: &mut super::Connection<F>,
2032    ) -> Result<(u64, Event)> {
2033        // When connection close is initiated by the local application (e.g. due
2034        // to a protocol error), the connection itself might be in a broken
2035        // state, so return early.
2036        if conn.local_error.is_some() {
2037            return Err(Error::Done);
2038        }
2039
2040        // Process control streams first.
2041        if let Some(stream_id) = self.peer_control_stream_id {
2042            match self.process_control_stream(conn, stream_id) {
2043                Ok(ev) => return Ok(ev),
2044
2045                Err(Error::Done) => (),
2046
2047                Err(e) => return Err(e),
2048            };
2049        }
2050
2051        if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
2052            match self.process_control_stream(conn, stream_id) {
2053                Ok(ev) => return Ok(ev),
2054
2055                Err(Error::Done) => (),
2056
2057                Err(e) => return Err(e),
2058            };
2059        }
2060
2061        if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
2062            match self.process_control_stream(conn, stream_id) {
2063                Ok(ev) => return Ok(ev),
2064
2065                Err(Error::Done) => (),
2066
2067                Err(e) => return Err(e),
2068            };
2069        }
2070
2071        // Process finished streams list.
2072        if let Some(finished) = self.finished_streams.pop_front() {
2073            return Ok((finished, Event::Finished));
2074        }
2075
2076        // Process HTTP/3 data from readable streams.
2077        for s in conn.readable() {
2078            trace!("{} stream id {} is readable", conn.trace_id(), s);
2079
2080            let ev = match self.process_readable_stream(conn, s, true) {
2081                Ok(v) => Some(v),
2082
2083                Err(Error::Done) => None,
2084
2085                // Return early if the stream was reset, to avoid returning
2086                // a Finished event later as well.
2087                Err(Error::TransportError(crate::Error::StreamReset(e))) =>
2088                    return Ok((s, Event::Reset(e))),
2089
2090                Err(e) => return Err(e),
2091            };
2092
2093            if conn.stream_finished(s) {
2094                self.process_finished_stream(s);
2095            }
2096
2097            // TODO: check if stream is completed so it can be freed
2098            if let Some(ev) = ev {
2099                return Ok(ev);
2100            }
2101        }
2102
2103        // Process finished streams list once again, to make sure `Finished`
2104        // events are returned when receiving empty stream frames with the fin
2105        // flag set.
2106        if let Some(finished) = self.finished_streams.pop_front() {
2107            if conn.stream_readable(finished) {
2108                // The stream is finished, but is still readable, it may
2109                // indicate that there is a pending error, such as reset.
2110                if let Err(crate::Error::StreamReset(e)) =
2111                    conn.stream_recv(finished, &mut [])
2112                {
2113                    return Ok((finished, Event::Reset(e)));
2114                }
2115            }
2116            return Ok((finished, Event::Finished));
2117        }
2118
2119        Err(Error::Done)
2120    }
2121
2122    /// Sends a GOAWAY frame to initiate graceful connection closure.
2123    ///
2124    /// When quiche is used in the server role, the `id` parameter is the stream
2125    /// ID of the highest processed request. This can be any valid ID between 0
2126    /// and 2^62-4. However, the ID cannot be increased. Failure to satisfy
2127    /// these conditions will return an error.
2128    ///
2129    /// This method does not close the QUIC connection. Applications are
2130    /// required to call [`close()`] themselves.
2131    ///
2132    /// [`close()`]: ../struct.Connection.html#method.close
2133    pub fn send_goaway<F: BufFactory>(
2134        &mut self, conn: &mut super::Connection<F>, id: u64,
2135    ) -> Result<()> {
2136        let mut id = id;
2137
2138        // TODO: server push
2139        //
2140        // In the meantime always send 0 from client.
2141        if !self.is_server {
2142            id = 0;
2143        }
2144
2145        if self.is_server && id % 4 != 0 {
2146            return Err(Error::IdError);
2147        }
2148
2149        if let Some(sent_id) = self.local_goaway_id {
2150            if id > sent_id {
2151                return Err(Error::IdError);
2152            }
2153        }
2154
2155        if let Some(stream_id) = self.control_stream_id {
2156            let mut d = [42; 10];
2157            let mut b = octets::OctetsMut::with_slice(&mut d);
2158
2159            let frame = frame::Frame::GoAway { id };
2160
2161            let wire_len = frame.to_bytes(&mut b)?;
2162            let stream_cap = conn.stream_capacity(stream_id)?;
2163
2164            if stream_cap < wire_len {
2165                return Err(Error::StreamBlocked);
2166            }
2167
2168            trace!("{} tx frm {:?}", conn.trace_id(), frame);
2169
2170            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2171                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2172                    stream_id,
2173                    length: Some(octets::varint_len(id) as u64),
2174                    frame: frame.to_qlog(),
2175                    ..Default::default()
2176                });
2177
2178                q.add_event_data_now(ev_data).ok();
2179            });
2180
2181            let off = b.off();
2182            conn.stream_send(stream_id, &d[..off], false)?;
2183
2184            self.local_goaway_id = Some(id);
2185        }
2186
2187        Ok(())
2188    }
2189
2190    /// Gets the raw settings from peer including unknown and reserved types.
2191    ///
2192    /// The order of settings is the same as received in the SETTINGS frame.
2193    pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
2194        self.peer_settings.raw.as_deref()
2195    }
2196
2197    fn open_uni_stream<F: BufFactory>(
2198        &mut self, conn: &mut super::Connection<F>, ty: u64,
2199    ) -> Result<u64> {
2200        let stream_id = self.next_uni_stream_id;
2201
2202        let mut d = [0; 8];
2203        let mut b = octets::OctetsMut::with_slice(&mut d);
2204
2205        match ty {
2206            // Control and QPACK streams are the most important to schedule.
2207            stream::HTTP3_CONTROL_STREAM_TYPE_ID |
2208            stream::QPACK_ENCODER_STREAM_TYPE_ID |
2209            stream::QPACK_DECODER_STREAM_TYPE_ID => {
2210                conn.stream_priority(stream_id, 0, false)?;
2211            },
2212
2213            // TODO: Server push
2214            stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
2215
2216            // Anything else is a GREASE stream, so make it the least important.
2217            _ => {
2218                conn.stream_priority(stream_id, 255, false)?;
2219            },
2220        }
2221
2222        conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
2223
2224        // To avoid skipping stream IDs, we only calculate the next available
2225        // stream ID when data has been successfully buffered.
2226        self.next_uni_stream_id = self
2227            .next_uni_stream_id
2228            .checked_add(4)
2229            .ok_or(Error::IdError)?;
2230
2231        Ok(stream_id)
2232    }
2233
2234    fn open_qpack_encoder_stream<F: BufFactory>(
2235        &mut self, conn: &mut super::Connection<F>,
2236    ) -> Result<()> {
2237        let stream_id =
2238            self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
2239
2240        self.local_qpack_streams.encoder_stream_id = Some(stream_id);
2241
2242        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2243            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2244                stream_id,
2245                initiator: Some(Initiator::Local),
2246                stream_type: StreamType::QpackEncode,
2247                ..Default::default()
2248            });
2249
2250            q.add_event_data_now(ev_data).ok();
2251        });
2252
2253        Ok(())
2254    }
2255
2256    fn open_qpack_decoder_stream<F: BufFactory>(
2257        &mut self, conn: &mut super::Connection<F>,
2258    ) -> Result<()> {
2259        let stream_id =
2260            self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
2261
2262        self.local_qpack_streams.decoder_stream_id = Some(stream_id);
2263
2264        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2265            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2266                stream_id,
2267                initiator: Some(Initiator::Local),
2268                stream_type: StreamType::QpackDecode,
2269                ..Default::default()
2270            });
2271
2272            q.add_event_data_now(ev_data).ok();
2273        });
2274
2275        Ok(())
2276    }
2277
2278    /// Send GREASE frames on the provided stream ID.
2279    fn send_grease_frames<F: BufFactory>(
2280        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2281    ) -> Result<()> {
2282        let mut d = [0; 8];
2283
2284        let stream_cap = match conn.stream_capacity(stream_id) {
2285            Ok(v) => v,
2286
2287            Err(e) => {
2288                if conn.stream_finished(stream_id) {
2289                    self.streams.remove(&stream_id);
2290                }
2291
2292                return Err(e.into());
2293            },
2294        };
2295
2296        let grease_frame1 = grease_value();
2297        let grease_frame2 = grease_value();
2298        let grease_payload = b"GREASE is the word";
2299
2300        let overhead = octets::varint_len(grease_frame1) + // frame type
2301            1 + // payload len
2302            octets::varint_len(grease_frame2) + // frame type
2303            1 + // payload len
2304            grease_payload.len(); // payload
2305
2306        // Don't send GREASE if there is not enough capacity for it. Greasing
2307        // will _not_ be attempted again later on.
2308        if stream_cap < overhead {
2309            return Ok(());
2310        }
2311
2312        // Empty GREASE frame.
2313        let mut b = octets::OctetsMut::with_slice(&mut d);
2314        conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
2315
2316        let mut b = octets::OctetsMut::with_slice(&mut d);
2317        conn.stream_send(stream_id, b.put_varint(0)?, false)?;
2318
2319        trace!(
2320            "{} tx frm GREASE stream={} len=0",
2321            conn.trace_id(),
2322            stream_id
2323        );
2324
2325        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2326            let frame = Http3Frame::Reserved {
2327                frame_type_bytes: grease_frame1,
2328                raw: None,
2329            };
2330            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2331                stream_id,
2332                length: Some(0),
2333                frame,
2334                ..Default::default()
2335            });
2336
2337            q.add_event_data_now(ev_data).ok();
2338        });
2339
2340        // GREASE frame with payload.
2341        let mut b = octets::OctetsMut::with_slice(&mut d);
2342        conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
2343
2344        let mut b = octets::OctetsMut::with_slice(&mut d);
2345        conn.stream_send(stream_id, b.put_varint(18)?, false)?;
2346
2347        conn.stream_send(stream_id, grease_payload, false)?;
2348
2349        trace!(
2350            "{} tx frm GREASE stream={} len={}",
2351            conn.trace_id(),
2352            stream_id,
2353            grease_payload.len()
2354        );
2355
2356        qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2357            let frame = Http3Frame::Reserved {
2358                frame_type_bytes: grease_frame2,
2359                raw: None,
2360            };
2361            let ev_data = EventData::Http3FrameCreated(FrameCreated {
2362                stream_id,
2363                length: Some(grease_payload.len() as u64),
2364                frame,
2365                ..Default::default()
2366            });
2367
2368            q.add_event_data_now(ev_data).ok();
2369        });
2370
2371        Ok(())
2372    }
2373
2374    /// Opens a new unidirectional stream with a GREASE type and sends some
2375    /// unframed payload.
2376    fn open_grease_stream<F: BufFactory>(
2377        &mut self, conn: &mut super::Connection<F>,
2378    ) -> Result<()> {
2379        let ty = grease_value();
2380        match self.open_uni_stream(conn, ty) {
2381            Ok(stream_id) => {
2382                conn.stream_send(stream_id, b"GREASE is the word", true)?;
2383
2384                trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
2385
2386                qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2387                    let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2388                        stream_id,
2389                        initiator: Some(Initiator::Local),
2390                        stream_type: StreamType::Unknown,
2391                        stream_type_bytes: Some(ty),
2392                        ..Default::default()
2393                    });
2394
2395                    q.add_event_data_now(ev_data).ok();
2396                });
2397            },
2398
2399            Err(Error::IdError) => {
2400                trace!("{} GREASE stream blocked", conn.trace_id(),);
2401
2402                return Ok(());
2403            },
2404
2405            Err(e) => return Err(e),
2406        };
2407
2408        Ok(())
2409    }
2410
2411    /// Sends SETTINGS frame based on HTTP/3 configuration.
2412    fn send_settings<F: BufFactory>(
2413        &mut self, conn: &mut super::Connection<F>,
2414    ) -> Result<()> {
2415        let stream_id = match self
2416            .open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
2417        {
2418            Ok(v) => v,
2419
2420            Err(e) => {
2421                trace!("{} Control stream blocked", conn.trace_id(),);
2422
2423                if e == Error::Done {
2424                    return Err(Error::InternalError);
2425                }
2426
2427                return Err(e);
2428            },
2429        };
2430
2431        self.control_stream_id = Some(stream_id);
2432
2433        qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2434            let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
2435                stream_id,
2436                initiator: Some(Initiator::Local),
2437                stream_type: StreamType::Control,
2438                ..Default::default()
2439            });
2440
2441            q.add_event_data_now(ev_data).ok();
2442        });
2443
2444        let grease = if conn.grease {
2445            Some((grease_value(), grease_value()))
2446        } else {
2447            None
2448        };
2449
2450        let frame = frame::Frame::Settings {
2451            max_field_section_size: self.local_settings.max_field_section_size,
2452            qpack_max_table_capacity: self
2453                .local_settings
2454                .qpack_max_table_capacity,
2455            qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
2456            connect_protocol_enabled: self
2457                .local_settings
2458                .connect_protocol_enabled,
2459            h3_datagram: self.local_settings.h3_datagram,
2460            grease,
2461            additional_settings: self.local_settings.additional_settings.clone(),
2462            raw: Default::default(),
2463        };
2464
2465        let mut d = [42; 128];
2466        let mut b = octets::OctetsMut::with_slice(&mut d);
2467
2468        frame.to_bytes(&mut b)?;
2469
2470        let off = b.off();
2471
2472        if let Some(id) = self.control_stream_id {
2473            conn.stream_send(id, &d[..off], false)?;
2474
2475            trace!(
2476                "{} tx frm SETTINGS stream={} len={}",
2477                conn.trace_id(),
2478                id,
2479                off
2480            );
2481
2482            qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
2483                let frame = frame.to_qlog();
2484                let ev_data = EventData::Http3FrameCreated(FrameCreated {
2485                    stream_id: id,
2486                    length: Some(off as u64),
2487                    frame,
2488                    ..Default::default()
2489                });
2490
2491                q.add_event_data_now(ev_data).ok();
2492            });
2493        }
2494
2495        Ok(())
2496    }
2497
2498    fn process_control_stream<F: BufFactory>(
2499        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2500    ) -> Result<(u64, Event)> {
2501        close_conn_if_critical_stream_finished(conn, stream_id)?;
2502
2503        if !conn.stream_readable(stream_id) {
2504            return Err(Error::Done);
2505        }
2506
2507        match self.process_readable_stream(conn, stream_id, true) {
2508            Ok(ev) => return Ok(ev),
2509
2510            Err(Error::Done) => (),
2511
2512            Err(e) => return Err(e),
2513        };
2514
2515        close_conn_if_critical_stream_finished(conn, stream_id)?;
2516
2517        Err(Error::Done)
2518    }
2519
2520    fn process_readable_stream<F: BufFactory>(
2521        &mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
2522    ) -> Result<(u64, Event)> {
2523        self.streams
2524            .entry(stream_id)
2525            .or_insert_with(|| <stream::Stream>::new(stream_id, false));
2526
2527        // We need to get a fresh reference to the stream for each
2528        // iteration, to avoid borrowing `self` for the entire duration
2529        // of the loop, because we'll need to borrow it again in the
2530        // `State::FramePayload` case below.
2531        while let Some(stream) = self.streams.get_mut(&stream_id) {
2532            match stream.state() {
2533                stream::State::StreamType => {
2534                    stream.try_fill_buffer(conn)?;
2535
2536                    let varint = match stream.try_consume_varint() {
2537                        Ok(v) => v,
2538
2539                        Err(_) => continue,
2540                    };
2541
2542                    let ty = stream::Type::deserialize(varint)?;
2543
2544                    if let Err(e) = stream.set_ty(ty) {
2545                        conn.close(true, e.to_wire(), b"")?;
2546                        return Err(e);
2547                    }
2548
2549                    qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
2550                        let ty_val = if matches!(ty, stream::Type::Unknown) {
2551                            Some(varint)
2552                        } else {
2553                            None
2554                        };
2555
2556                        let ev_data =
2557                            EventData::Http3StreamTypeSet(StreamTypeSet {
2558                                stream_id,
2559                                initiator: Some(Initiator::Remote),
2560                                stream_type: ty.to_qlog(),
2561                                stream_type_bytes: ty_val,
2562                                ..Default::default()
2563                            });
2564
2565                        q.add_event_data_now(ev_data).ok();
2566                    });
2567
2568                    match &ty {
2569                        stream::Type::Control => {
2570                            // Only one control stream allowed.
2571                            if self.peer_control_stream_id.is_some() {
2572                                conn.close(
2573                                    true,
2574                                    Error::StreamCreationError.to_wire(),
2575                                    b"Received multiple control streams",
2576                                )?;
2577
2578                                return Err(Error::StreamCreationError);
2579                            }
2580
2581                            trace!(
2582                                "{} open peer's control stream {}",
2583                                conn.trace_id(),
2584                                stream_id
2585                            );
2586
2587                            close_conn_if_critical_stream_finished(
2588                                conn, stream_id,
2589                            )?;
2590
2591                            self.peer_control_stream_id = Some(stream_id);
2592                        },
2593
2594                        stream::Type::Push => {
2595                            // Only clients can receive push stream.
2596                            if self.is_server {
2597                                conn.close(
2598                                    true,
2599                                    Error::StreamCreationError.to_wire(),
2600                                    b"Server received push stream.",
2601                                )?;
2602
2603                                return Err(Error::StreamCreationError);
2604                            }
2605                        },
2606
2607                        stream::Type::QpackEncoder => {
2608                            // Only one qpack encoder stream allowed.
2609                            if self.peer_qpack_streams.encoder_stream_id.is_some()
2610                            {
2611                                conn.close(
2612                                    true,
2613                                    Error::StreamCreationError.to_wire(),
2614                                    b"Received multiple QPACK encoder streams",
2615                                )?;
2616
2617                                return Err(Error::StreamCreationError);
2618                            }
2619
2620                            close_conn_if_critical_stream_finished(
2621                                conn, stream_id,
2622                            )?;
2623
2624                            self.peer_qpack_streams.encoder_stream_id =
2625                                Some(stream_id);
2626                        },
2627
2628                        stream::Type::QpackDecoder => {
2629                            // Only one qpack decoder allowed.
2630                            if self.peer_qpack_streams.decoder_stream_id.is_some()
2631                            {
2632                                conn.close(
2633                                    true,
2634                                    Error::StreamCreationError.to_wire(),
2635                                    b"Received multiple QPACK decoder streams",
2636                                )?;
2637
2638                                return Err(Error::StreamCreationError);
2639                            }
2640
2641                            close_conn_if_critical_stream_finished(
2642                                conn, stream_id,
2643                            )?;
2644
2645                            self.peer_qpack_streams.decoder_stream_id =
2646                                Some(stream_id);
2647                        },
2648
2649                        stream::Type::Unknown => {
2650                            // Unknown stream types are ignored.
2651                            // TODO: we MAY send STOP_SENDING
2652                        },
2653
2654                        stream::Type::Request => unreachable!(),
2655                    }
2656                },
2657
2658                stream::State::PushId => {
2659                    stream.try_fill_buffer(conn)?;
2660
2661                    let varint = match stream.try_consume_varint() {
2662                        Ok(v) => v,
2663
2664                        Err(_) => continue,
2665                    };
2666
2667                    if let Err(e) = stream.set_push_id(varint) {
2668                        conn.close(true, e.to_wire(), b"")?;
2669                        return Err(e);
2670                    }
2671                },
2672
2673                stream::State::FrameType => {
2674                    stream.try_fill_buffer(conn)?;
2675
2676                    let varint = match stream.try_consume_varint() {
2677                        Ok(v) => v,
2678
2679                        Err(_) => continue,
2680                    };
2681
2682                    match stream.set_frame_type(varint) {
2683                        Err(Error::FrameUnexpected) => {
2684                            let msg = format!("Unexpected frame type {varint}");
2685
2686                            conn.close(
2687                                true,
2688                                Error::FrameUnexpected.to_wire(),
2689                                msg.as_bytes(),
2690                            )?;
2691
2692                            return Err(Error::FrameUnexpected);
2693                        },
2694
2695                        Err(e) => {
2696                            conn.close(
2697                                true,
2698                                e.to_wire(),
2699                                b"Error handling frame.",
2700                            )?;
2701
2702                            return Err(e);
2703                        },
2704
2705                        _ => (),
2706                    }
2707                },
2708
2709                stream::State::FramePayloadLen => {
2710                    stream.try_fill_buffer(conn)?;
2711
2712                    let payload_len = match stream.try_consume_varint() {
2713                        Ok(v) => v,
2714
2715                        Err(_) => continue,
2716                    };
2717
2718                    // DATA frames are handled uniquely. After this point we lose
2719                    // visibility of DATA framing, so just log here.
2720                    if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
2721                        trace!(
2722                            "{} rx frm DATA stream={} wire_payload_len={}",
2723                            conn.trace_id(),
2724                            stream_id,
2725                            payload_len
2726                        );
2727
2728                        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2729                            let frame = Http3Frame::Data { raw: None };
2730
2731                            let ev_data =
2732                                EventData::Http3FrameParsed(FrameParsed {
2733                                    stream_id,
2734                                    length: Some(payload_len),
2735                                    frame,
2736                                    ..Default::default()
2737                                });
2738
2739                            q.add_event_data_now(ev_data).ok();
2740                        });
2741                    }
2742
2743                    if let Err(e) = stream.set_frame_payload_len(payload_len) {
2744                        conn.close(true, e.to_wire(), b"")?;
2745                        return Err(e);
2746                    }
2747                },
2748
2749                stream::State::FramePayload => {
2750                    // Do not emit events when not polling.
2751                    if !polling {
2752                        break;
2753                    }
2754
2755                    stream.try_fill_buffer(conn)?;
2756
2757                    let (frame, payload_len) = match stream.try_consume_frame() {
2758                        Ok(frame) => frame,
2759
2760                        Err(Error::Done) => return Err(Error::Done),
2761
2762                        Err(e) => {
2763                            conn.close(
2764                                true,
2765                                e.to_wire(),
2766                                b"Error handling frame.",
2767                            )?;
2768
2769                            return Err(e);
2770                        },
2771                    };
2772
2773                    match self.process_frame(conn, stream_id, frame, payload_len)
2774                    {
2775                        Ok(ev) => return Ok(ev),
2776
2777                        Err(Error::Done) => {
2778                            // This might be a frame that is processed internally
2779                            // without needing to bubble up to the user as an
2780                            // event. Check whether the frame has FIN'd by QUIC
2781                            // to prevent trying to read again on a closed stream.
2782                            if conn.stream_finished(stream_id) {
2783                                break;
2784                            }
2785                        },
2786
2787                        Err(e) => return Err(e),
2788                    };
2789                },
2790
2791                stream::State::Data => {
2792                    // Do not emit events when not polling.
2793                    if !polling {
2794                        break;
2795                    }
2796
2797                    if !stream.try_trigger_data_event() {
2798                        break;
2799                    }
2800
2801                    return Ok((stream_id, Event::Data));
2802                },
2803
2804                stream::State::QpackInstruction => {
2805                    let mut d = [0; 4096];
2806
2807                    // Read data from the stream and discard immediately.
2808                    loop {
2809                        let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
2810
2811                        match stream.ty() {
2812                            Some(stream::Type::QpackEncoder) =>
2813                                self.peer_qpack_streams.encoder_stream_bytes +=
2814                                    recv as u64,
2815                            Some(stream::Type::QpackDecoder) =>
2816                                self.peer_qpack_streams.decoder_stream_bytes +=
2817                                    recv as u64,
2818                            _ => unreachable!(),
2819                        };
2820
2821                        if fin {
2822                            close_conn_critical_stream(conn)?;
2823                        }
2824                    }
2825                },
2826
2827                stream::State::Drain => {
2828                    // Discard incoming data on the stream.
2829                    conn.stream_shutdown(
2830                        stream_id,
2831                        crate::Shutdown::Read,
2832                        0x100,
2833                    )?;
2834
2835                    break;
2836                },
2837
2838                stream::State::Finished => break,
2839            }
2840        }
2841
2842        Err(Error::Done)
2843    }
2844
2845    fn process_finished_stream(&mut self, stream_id: u64) {
2846        let stream = match self.streams.get_mut(&stream_id) {
2847            Some(v) => v,
2848
2849            None => return,
2850        };
2851
2852        if stream.state() == stream::State::Finished {
2853            return;
2854        }
2855
2856        match stream.ty() {
2857            Some(stream::Type::Request) | Some(stream::Type::Push) => {
2858                stream.finished();
2859
2860                self.finished_streams.push_back(stream_id);
2861            },
2862
2863            _ => (),
2864        };
2865    }
2866
2867    fn process_frame<F: BufFactory>(
2868        &mut self, conn: &mut super::Connection<F>, stream_id: u64,
2869        frame: frame::Frame, payload_len: u64,
2870    ) -> Result<(u64, Event)> {
2871        trace!(
2872            "{} rx frm {:?} stream={} payload_len={}",
2873            conn.trace_id(),
2874            frame,
2875            stream_id,
2876            payload_len
2877        );
2878
2879        qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2880            // HEADERS frames are special case and will be logged below.
2881            if !matches!(frame, frame::Frame::Headers { .. }) {
2882                let frame = frame.to_qlog();
2883                let ev_data = EventData::Http3FrameParsed(FrameParsed {
2884                    stream_id,
2885                    length: Some(payload_len),
2886                    frame,
2887                    ..Default::default()
2888                });
2889
2890                q.add_event_data_now(ev_data).ok();
2891            }
2892        });
2893
2894        match frame {
2895            frame::Frame::Settings {
2896                max_field_section_size,
2897                qpack_max_table_capacity,
2898                qpack_blocked_streams,
2899                connect_protocol_enabled,
2900                h3_datagram,
2901                additional_settings,
2902                raw,
2903                ..
2904            } => {
2905                self.peer_settings = ConnectionSettings {
2906                    max_field_section_size,
2907                    qpack_max_table_capacity,
2908                    qpack_blocked_streams,
2909                    connect_protocol_enabled,
2910                    h3_datagram,
2911                    additional_settings,
2912                    raw,
2913                };
2914
2915                if let Some(1) = h3_datagram {
2916                    // The peer MUST have also enabled DATAGRAM with a TP
2917                    if conn.dgram_max_writable_len().is_none() {
2918                        conn.close(
2919                            true,
2920                            Error::SettingsError.to_wire(),
2921                            b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
2922                        )?;
2923
2924                        return Err(Error::SettingsError);
2925                    }
2926                }
2927            },
2928
2929            frame::Frame::Headers { header_block } => {
2930                // Servers reject too many HEADERS frames.
2931                if let Some(s) = self.streams.get_mut(&stream_id) {
2932                    if self.is_server && s.headers_received_count() == 2 {
2933                        conn.close(
2934                            true,
2935                            Error::FrameUnexpected.to_wire(),
2936                            b"Too many HEADERS frames",
2937                        )?;
2938                        return Err(Error::FrameUnexpected);
2939                    }
2940
2941                    s.increment_headers_received();
2942                }
2943
2944                // Use "infinite" as default value for max_field_section_size if
2945                // it is not configured by the application.
2946                let max_size = self
2947                    .local_settings
2948                    .max_field_section_size
2949                    .unwrap_or(u64::MAX);
2950
2951                let headers = match self
2952                    .qpack_decoder
2953                    .decode(&header_block[..], max_size)
2954                {
2955                    Ok(v) => v,
2956
2957                    Err(e) => {
2958                        let e = match e {
2959                            qpack::Error::HeaderListTooLarge =>
2960                                Error::ExcessiveLoad,
2961
2962                            _ => Error::QpackDecompressionFailed,
2963                        };
2964
2965                        conn.close(true, e.to_wire(), b"Error parsing headers.")?;
2966
2967                        return Err(e);
2968                    },
2969                };
2970
2971                qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
2972                    let qlog_headers = headers
2973                        .iter()
2974                        .map(|h| qlog::events::http3::HttpHeader {
2975                            name: Some(
2976                                String::from_utf8_lossy(h.name()).into_owned(),
2977                            ),
2978                            name_bytes: None,
2979                            value: Some(
2980                                String::from_utf8_lossy(h.value()).into_owned(),
2981                            ),
2982                            value_bytes: None,
2983                        })
2984                        .collect();
2985
2986                    let frame = Http3Frame::Headers {
2987                        headers: qlog_headers,
2988                        raw: None,
2989                    };
2990
2991                    let ev_data = EventData::Http3FrameParsed(FrameParsed {
2992                        stream_id,
2993                        length: Some(payload_len),
2994                        frame,
2995                        ..Default::default()
2996                    });
2997
2998                    q.add_event_data_now(ev_data).ok();
2999                });
3000
3001                let more_frames = !conn.stream_finished(stream_id);
3002
3003                return Ok((stream_id, Event::Headers {
3004                    list: headers,
3005                    more_frames,
3006                }));
3007            },
3008
3009            frame::Frame::Data { .. } => {
3010                // Do nothing. The Data event is returned separately.
3011            },
3012
3013            frame::Frame::GoAway { id } => {
3014                if !self.is_server && id % 4 != 0 {
3015                    conn.close(
3016                        true,
3017                        Error::FrameUnexpected.to_wire(),
3018                        b"GOAWAY received with ID of non-request stream",
3019                    )?;
3020
3021                    return Err(Error::IdError);
3022                }
3023
3024                if let Some(received_id) = self.peer_goaway_id {
3025                    if id > received_id {
3026                        conn.close(
3027                            true,
3028                            Error::IdError.to_wire(),
3029                            b"GOAWAY received with ID larger than previously received",
3030                        )?;
3031
3032                        return Err(Error::IdError);
3033                    }
3034                }
3035
3036                self.peer_goaway_id = Some(id);
3037
3038                return Ok((id, Event::GoAway));
3039            },
3040
3041            frame::Frame::MaxPushId { push_id } => {
3042                if !self.is_server {
3043                    conn.close(
3044                        true,
3045                        Error::FrameUnexpected.to_wire(),
3046                        b"MAX_PUSH_ID received by client",
3047                    )?;
3048
3049                    return Err(Error::FrameUnexpected);
3050                }
3051
3052                if push_id < self.max_push_id {
3053                    conn.close(
3054                        true,
3055                        Error::IdError.to_wire(),
3056                        b"MAX_PUSH_ID reduced limit",
3057                    )?;
3058
3059                    return Err(Error::IdError);
3060                }
3061
3062                self.max_push_id = push_id;
3063            },
3064
3065            frame::Frame::PushPromise { .. } => {
3066                if self.is_server {
3067                    conn.close(
3068                        true,
3069                        Error::FrameUnexpected.to_wire(),
3070                        b"PUSH_PROMISE received by server",
3071                    )?;
3072
3073                    return Err(Error::FrameUnexpected);
3074                }
3075
3076                if stream_id % 4 != 0 {
3077                    conn.close(
3078                        true,
3079                        Error::FrameUnexpected.to_wire(),
3080                        b"PUSH_PROMISE received on non-request stream",
3081                    )?;
3082
3083                    return Err(Error::FrameUnexpected);
3084                }
3085
3086                // TODO: implement more checks and PUSH_PROMISE event
3087            },
3088
3089            frame::Frame::CancelPush { .. } => {
3090                // TODO: implement CANCEL_PUSH frame
3091            },
3092
3093            frame::Frame::PriorityUpdateRequest {
3094                prioritized_element_id,
3095                priority_field_value,
3096            } => {
3097                if !self.is_server {
3098                    conn.close(
3099                        true,
3100                        Error::FrameUnexpected.to_wire(),
3101                        b"PRIORITY_UPDATE received by client",
3102                    )?;
3103
3104                    return Err(Error::FrameUnexpected);
3105                }
3106
3107                if prioritized_element_id % 4 != 0 {
3108                    conn.close(
3109                        true,
3110                        Error::FrameUnexpected.to_wire(),
3111                        b"PRIORITY_UPDATE for request stream type with wrong ID",
3112                    )?;
3113
3114                    return Err(Error::FrameUnexpected);
3115                }
3116
3117                if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
3118                    conn.close(
3119                        true,
3120                        Error::IdError.to_wire(),
3121                        b"PRIORITY_UPDATE for request stream beyond max streams limit",
3122                    )?;
3123
3124                    return Err(Error::IdError);
3125                }
3126
3127                // If the PRIORITY_UPDATE is valid, consider storing the latest
3128                // contents. Due to reordering, it is possible that we might
3129                // receive frames that reference streams that have not yet to
3130                // been opened and that's OK because it's within our concurrency
3131                // limit. However, we discard PRIORITY_UPDATE that refers to
3132                // streams that we know have been collected.
3133                if conn.streams.is_collected(prioritized_element_id) {
3134                    return Err(Error::Done);
3135                }
3136
3137                // If the stream did not yet exist, create it and store.
3138                let stream =
3139                    self.streams.entry(prioritized_element_id).or_insert_with(
3140                        || <stream::Stream>::new(prioritized_element_id, false),
3141                    );
3142
3143                let had_priority_update = stream.has_last_priority_update();
3144                stream.set_last_priority_update(Some(priority_field_value));
3145
3146                // Only trigger the event when there wasn't already a stored
3147                // PRIORITY_UPDATE.
3148                if !had_priority_update {
3149                    return Ok((prioritized_element_id, Event::PriorityUpdate));
3150                } else {
3151                    return Err(Error::Done);
3152                }
3153            },
3154
3155            frame::Frame::PriorityUpdatePush {
3156                prioritized_element_id,
3157                ..
3158            } => {
3159                if !self.is_server {
3160                    conn.close(
3161                        true,
3162                        Error::FrameUnexpected.to_wire(),
3163                        b"PRIORITY_UPDATE received by client",
3164                    )?;
3165
3166                    return Err(Error::FrameUnexpected);
3167                }
3168
3169                if prioritized_element_id % 3 != 0 {
3170                    conn.close(
3171                        true,
3172                        Error::FrameUnexpected.to_wire(),
3173                        b"PRIORITY_UPDATE for push stream type with wrong ID",
3174                    )?;
3175
3176                    return Err(Error::FrameUnexpected);
3177                }
3178
3179                // TODO: we only implement this if we implement server push
3180            },
3181
3182            frame::Frame::Unknown { .. } => (),
3183        }
3184
3185        Err(Error::Done)
3186    }
3187
3188    /// Collects and returns statistics about the connection.
3189    #[inline]
3190    pub fn stats(&self) -> Stats {
3191        Stats {
3192            qpack_encoder_stream_recv_bytes: self
3193                .peer_qpack_streams
3194                .encoder_stream_bytes,
3195            qpack_decoder_stream_recv_bytes: self
3196                .peer_qpack_streams
3197                .decoder_stream_bytes,
3198        }
3199    }
3200}
3201
3202/// Generates an HTTP/3 GREASE variable length integer.
3203pub fn grease_value() -> u64 {
3204    let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
3205    31 * n + 33
3206}
3207
3208#[doc(hidden)]
3209#[cfg(any(test, feature = "internal"))]
3210pub mod testing {
3211    use super::*;
3212
3213    use crate::test_utils;
3214    use crate::DefaultBufFactory;
3215
3216    /// Session is an HTTP/3 test helper structure. It holds a client, server
3217    /// and pipe that allows them to communicate.
3218    ///
3219    /// `default()` creates a session with some sensible default
3220    /// configuration. `with_configs()` allows for providing a specific
3221    /// configuration.
3222    ///
3223    /// `handshake()` performs all the steps needed to establish an HTTP/3
3224    /// connection.
3225    ///
3226    /// Some utility functions are provided that make it less verbose to send
3227    /// request, responses and individual headers. The full quiche API remains
3228    /// available for any test that need to do unconventional things (such as
3229    /// bad behaviour that triggers errors).
3230    pub struct Session<F = DefaultBufFactory>
3231    where
3232        F: BufFactory,
3233    {
3234        pub pipe: test_utils::Pipe<F>,
3235        pub client: Connection,
3236        pub server: Connection,
3237    }
3238
3239    impl Session {
3240        pub fn new() -> Result<Session> {
3241            Session::<DefaultBufFactory>::new_with_buf()
3242        }
3243
3244        pub fn with_configs(
3245            config: &mut crate::Config, h3_config: &Config,
3246        ) -> Result<Session> {
3247            Session::<DefaultBufFactory>::with_configs_and_buf(config, h3_config)
3248        }
3249
3250        pub fn default_configs() -> Result<(crate::Config, Config)> {
3251            fn path_relative_to_manifest_dir(path: &str) -> String {
3252                std::fs::canonicalize(
3253                    std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
3254                )
3255                .unwrap()
3256                .to_string_lossy()
3257                .into_owned()
3258            }
3259
3260            let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
3261            config.load_cert_chain_from_pem_file(
3262                &path_relative_to_manifest_dir("examples/cert.crt"),
3263            )?;
3264            config.load_priv_key_from_pem_file(
3265                &path_relative_to_manifest_dir("examples/cert.key"),
3266            )?;
3267            config.set_application_protos(&[b"h3"])?;
3268            config.set_initial_max_data(1500);
3269            config.set_initial_max_stream_data_bidi_local(150);
3270            config.set_initial_max_stream_data_bidi_remote(150);
3271            config.set_initial_max_stream_data_uni(150);
3272            config.set_initial_max_streams_bidi(5);
3273            config.set_initial_max_streams_uni(5);
3274            config.verify_peer(false);
3275            config.enable_dgram(true, 3, 3);
3276            config.set_ack_delay_exponent(8);
3277
3278            let h3_config = Config::new()?;
3279            Ok((config, h3_config))
3280        }
3281    }
3282
3283    impl<F: BufFactory> Session<F> {
3284        pub fn new_with_buf() -> Result<Session<F>> {
3285            let (mut config, h3_config) = Session::default_configs()?;
3286            Session::with_configs_and_buf(&mut config, &h3_config)
3287        }
3288
3289        pub fn with_configs_and_buf(
3290            config: &mut crate::Config, h3_config: &Config,
3291        ) -> Result<Session<F>> {
3292            let pipe = test_utils::Pipe::with_config_and_buf(config)?;
3293            let client_dgram = pipe.client.dgram_enabled();
3294            let server_dgram = pipe.server.dgram_enabled();
3295            Ok(Session {
3296                pipe,
3297                client: Connection::new(h3_config, false, client_dgram)?,
3298                server: Connection::new(h3_config, true, server_dgram)?,
3299            })
3300        }
3301
3302        /// Do the HTTP/3 handshake so both ends are in sane initial state.
3303        pub fn handshake(&mut self) -> Result<()> {
3304            self.pipe.handshake()?;
3305
3306            // Client streams.
3307            self.client.send_settings(&mut self.pipe.client)?;
3308            self.pipe.advance().ok();
3309
3310            self.client
3311                .open_qpack_encoder_stream(&mut self.pipe.client)?;
3312            self.pipe.advance().ok();
3313
3314            self.client
3315                .open_qpack_decoder_stream(&mut self.pipe.client)?;
3316            self.pipe.advance().ok();
3317
3318            if self.pipe.client.grease {
3319                self.client.open_grease_stream(&mut self.pipe.client)?;
3320            }
3321
3322            self.pipe.advance().ok();
3323
3324            // Server streams.
3325            self.server.send_settings(&mut self.pipe.server)?;
3326            self.pipe.advance().ok();
3327
3328            self.server
3329                .open_qpack_encoder_stream(&mut self.pipe.server)?;
3330            self.pipe.advance().ok();
3331
3332            self.server
3333                .open_qpack_decoder_stream(&mut self.pipe.server)?;
3334            self.pipe.advance().ok();
3335
3336            if self.pipe.server.grease {
3337                self.server.open_grease_stream(&mut self.pipe.server)?;
3338            }
3339
3340            self.advance().ok();
3341
3342            while self.client.poll(&mut self.pipe.client).is_ok() {
3343                // Do nothing.
3344            }
3345
3346            while self.server.poll(&mut self.pipe.server).is_ok() {
3347                // Do nothing.
3348            }
3349
3350            Ok(())
3351        }
3352
3353        /// Advances the session pipe over the buffer.
3354        pub fn advance(&mut self) -> crate::Result<()> {
3355            self.pipe.advance()
3356        }
3357
3358        /// Polls the client for events.
3359        pub fn poll_client(&mut self) -> Result<(u64, Event)> {
3360            self.client.poll(&mut self.pipe.client)
3361        }
3362
3363        /// Polls the server for events.
3364        pub fn poll_server(&mut self) -> Result<(u64, Event)> {
3365            self.server.poll(&mut self.pipe.server)
3366        }
3367
3368        /// Sends a request from client with default headers.
3369        ///
3370        /// On success it returns the newly allocated stream and the headers.
3371        pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
3372            let req = vec![
3373                Header::new(b":method", b"GET"),
3374                Header::new(b":scheme", b"https"),
3375                Header::new(b":authority", b"quic.tech"),
3376                Header::new(b":path", b"/test"),
3377                Header::new(b"user-agent", b"quiche-test"),
3378            ];
3379
3380            let stream =
3381                self.client.send_request(&mut self.pipe.client, &req, fin)?;
3382
3383            self.advance().ok();
3384
3385            Ok((stream, req))
3386        }
3387
3388        /// Sends a response from server with default headers.
3389        ///
3390        /// On success it returns the headers.
3391        pub fn send_response(
3392            &mut self, stream: u64, fin: bool,
3393        ) -> Result<Vec<Header>> {
3394            let resp = vec![
3395                Header::new(b":status", b"200"),
3396                Header::new(b"server", b"quiche-test"),
3397            ];
3398
3399            self.server.send_response(
3400                &mut self.pipe.server,
3401                stream,
3402                &resp,
3403                fin,
3404            )?;
3405
3406            self.advance().ok();
3407
3408            Ok(resp)
3409        }
3410
3411        /// Sends some default payload from client.
3412        ///
3413        /// On success it returns the payload.
3414        pub fn send_body_client(
3415            &mut self, stream: u64, fin: bool,
3416        ) -> Result<Vec<u8>> {
3417            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3418
3419            self.client
3420                .send_body(&mut self.pipe.client, stream, &bytes, fin)?;
3421
3422            self.advance().ok();
3423
3424            Ok(bytes)
3425        }
3426
3427        /// Fetches DATA payload from the server.
3428        ///
3429        /// On success it returns the number of bytes received.
3430        pub fn recv_body_client(
3431            &mut self, stream: u64, buf: &mut [u8],
3432        ) -> Result<usize> {
3433            self.client.recv_body(&mut self.pipe.client, stream, buf)
3434        }
3435
3436        /// Fetches DATA payload from the server.
3437        ///
3438        /// On success it returns the number of bytes received.
3439        pub fn recv_body_buf_client<B: bytes::BufMut>(
3440            &mut self, stream: u64, buf: B,
3441        ) -> Result<usize> {
3442            self.client
3443                .recv_body_buf(&mut self.pipe.client, stream, buf)
3444        }
3445
3446        /// Sends some default payload from server.
3447        ///
3448        /// On success it returns the payload.
3449        pub fn send_body_server(
3450            &mut self, stream: u64, fin: bool,
3451        ) -> Result<Vec<u8>> {
3452            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3453
3454            self.server
3455                .send_body(&mut self.pipe.server, stream, &bytes, fin)?;
3456
3457            self.advance().ok();
3458
3459            Ok(bytes)
3460        }
3461
3462        /// Fetches DATA payload from the client.
3463        ///
3464        /// On success it returns the number of bytes received.
3465        pub fn recv_body_server(
3466            &mut self, stream: u64, buf: &mut [u8],
3467        ) -> Result<usize> {
3468            self.server.recv_body(&mut self.pipe.server, stream, buf)
3469        }
3470
3471        /// Fetches DATA payload from the client.
3472        ///
3473        /// On success it returns the number of bytes received.
3474        pub fn recv_body_buf_server<B: bytes::BufMut>(
3475            &mut self, stream: u64, buf: B,
3476        ) -> Result<usize> {
3477            self.server
3478                .recv_body_buf(&mut self.pipe.server, stream, buf)
3479        }
3480
3481        /// Sends a single HTTP/3 frame from the client.
3482        pub fn send_frame_client(
3483            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3484        ) -> Result<()> {
3485            let mut d = [42; 65535];
3486
3487            let mut b = octets::OctetsMut::with_slice(&mut d);
3488
3489            frame.to_bytes(&mut b)?;
3490
3491            let off = b.off();
3492            self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
3493
3494            self.advance().ok();
3495
3496            Ok(())
3497        }
3498
3499        /// Send an HTTP/3 DATAGRAM with default data from the client.
3500        ///
3501        /// On success it returns the data.
3502        pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3503            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3504            let len = octets::varint_len(flow_id) + bytes.len();
3505            let mut d = vec![0; len];
3506            let mut b = octets::OctetsMut::with_slice(&mut d);
3507
3508            b.put_varint(flow_id)?;
3509            b.put_bytes(&bytes)?;
3510
3511            self.pipe.client.dgram_send(&d)?;
3512
3513            self.advance().ok();
3514
3515            Ok(bytes)
3516        }
3517
3518        /// Receives an HTTP/3 DATAGRAM from the server.
3519        ///
3520        /// On success it returns the DATAGRAM length, flow ID and flow ID
3521        /// length.
3522        pub fn recv_dgram_client(
3523            &mut self, buf: &mut [u8],
3524        ) -> Result<(usize, u64, usize)> {
3525            let len = self.pipe.client.dgram_recv(buf)?;
3526            let mut b = octets::Octets::with_slice(buf);
3527            let flow_id = b.get_varint()?;
3528
3529            Ok((len, flow_id, b.off()))
3530        }
3531
3532        /// Send an HTTP/3 DATAGRAM with default data from the server
3533        ///
3534        /// On success it returns the data.
3535        pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
3536            let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
3537            let len = octets::varint_len(flow_id) + bytes.len();
3538            let mut d = vec![0; len];
3539            let mut b = octets::OctetsMut::with_slice(&mut d);
3540
3541            b.put_varint(flow_id)?;
3542            b.put_bytes(&bytes)?;
3543
3544            self.pipe.server.dgram_send(&d)?;
3545
3546            self.advance().ok();
3547
3548            Ok(bytes)
3549        }
3550
3551        /// Receives an HTTP/3 DATAGRAM from the client.
3552        ///
3553        /// On success it returns the DATAGRAM length, flow ID and flow ID
3554        /// length.
3555        pub fn recv_dgram_server(
3556            &mut self, buf: &mut [u8],
3557        ) -> Result<(usize, u64, usize)> {
3558            let len = self.pipe.server.dgram_recv(buf)?;
3559            let mut b = octets::Octets::with_slice(buf);
3560            let flow_id = b.get_varint()?;
3561
3562            Ok((len, flow_id, b.off()))
3563        }
3564
3565        /// Sends a single HTTP/3 frame from the server.
3566        pub fn send_frame_server(
3567            &mut self, frame: frame::Frame, stream_id: u64, fin: bool,
3568        ) -> Result<()> {
3569            let mut d = [42; 65535];
3570
3571            let mut b = octets::OctetsMut::with_slice(&mut d);
3572
3573            frame.to_bytes(&mut b)?;
3574
3575            let off = b.off();
3576            self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
3577
3578            self.advance().ok();
3579
3580            Ok(())
3581        }
3582
3583        /// Sends an arbitrary buffer of HTTP/3 stream data from the client.
3584        pub fn send_arbitrary_stream_data_client(
3585            &mut self, data: &[u8], stream_id: u64, fin: bool,
3586        ) -> Result<()> {
3587            self.pipe.client.stream_send(stream_id, data, fin)?;
3588
3589            self.advance().ok();
3590
3591            Ok(())
3592        }
3593
3594        /// Sends an arbitrary buffer of HTTP/3 stream data from the server.
3595        pub fn send_arbitrary_stream_data_server(
3596            &mut self, data: &[u8], stream_id: u64, fin: bool,
3597        ) -> Result<()> {
3598            self.pipe.server.stream_send(stream_id, data, fin)?;
3599
3600            self.advance().ok();
3601
3602            Ok(())
3603        }
3604    }
3605}
3606
3607#[cfg(test)]
3608mod tests {
3609    use bytes::BufMut as _;
3610
3611    use super::*;
3612
3613    use super::testing::*;
3614
3615    #[test]
3616    /// Make sure that random GREASE values is within the specified limit.
3617    fn grease_value_in_varint_limit() {
3618        assert!(grease_value() < 2u64.pow(62) - 1);
3619    }
3620
3621    #[cfg(not(feature = "openssl"))] // 0-RTT not supported when using openssl/quictls
3622    #[test]
3623    fn h3_handshake_0rtt() {
3624        let mut buf = [0; 65535];
3625
3626        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
3627        config
3628            .load_cert_chain_from_pem_file("examples/cert.crt")
3629            .unwrap();
3630        config
3631            .load_priv_key_from_pem_file("examples/cert.key")
3632            .unwrap();
3633        config
3634            .set_application_protos(&[b"proto1", b"proto2"])
3635            .unwrap();
3636        config.set_initial_max_data(30);
3637        config.set_initial_max_stream_data_bidi_local(15);
3638        config.set_initial_max_stream_data_bidi_remote(15);
3639        config.set_initial_max_stream_data_uni(15);
3640        config.set_initial_max_streams_bidi(3);
3641        config.set_initial_max_streams_uni(3);
3642        config.enable_early_data();
3643        config.verify_peer(false);
3644
3645        let h3_config = Config::new().unwrap();
3646
3647        // Perform initial handshake.
3648        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3649        assert_eq!(pipe.handshake(), Ok(()));
3650
3651        // Extract session,
3652        let session = pipe.client.session().unwrap();
3653
3654        // Configure session on new connection.
3655        let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
3656        assert_eq!(pipe.client.set_session(session), Ok(()));
3657
3658        // Can't create an H3 connection until the QUIC connection is determined
3659        // to have made sufficient early data progress.
3660        assert!(matches!(
3661            Connection::with_transport(&mut pipe.client, &h3_config),
3662            Err(Error::InternalError)
3663        ));
3664
3665        // Client sends initial flight.
3666        let (len, _) = pipe.client.send(&mut buf).unwrap();
3667
3668        // Now an H3 connection can be created.
3669        assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
3670        assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
3671
3672        // Client sends 0-RTT packet.
3673        let pkt_type = crate::packet::Type::ZeroRTT;
3674
3675        let frames = [crate::frame::Frame::Stream {
3676            stream_id: 6,
3677            data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
3678        }];
3679
3680        assert_eq!(
3681            pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
3682            Ok(1200)
3683        );
3684
3685        assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
3686
3687        // 0-RTT stream data is readable.
3688        let mut r = pipe.server.readable();
3689        assert_eq!(r.next(), Some(6));
3690        assert_eq!(r.next(), None);
3691
3692        let mut b = [0; 15];
3693        assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
3694        assert_eq!(&b[..5], b"aaaaa");
3695    }
3696
3697    #[test]
3698    /// Send a request with no body, get a response with no body.
3699    fn request_no_body_response_no_body() {
3700        let mut s = Session::new().unwrap();
3701        s.handshake().unwrap();
3702
3703        let (stream, req) = s.send_request(true).unwrap();
3704
3705        assert_eq!(stream, 0);
3706
3707        let ev_headers = Event::Headers {
3708            list: req,
3709            more_frames: false,
3710        };
3711
3712        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3713        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3714
3715        let resp = s.send_response(stream, true).unwrap();
3716
3717        let ev_headers = Event::Headers {
3718            list: resp,
3719            more_frames: false,
3720        };
3721
3722        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3723        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3724        assert_eq!(s.poll_client(), Err(Error::Done));
3725    }
3726
3727    #[test]
3728    /// Send a request with no body, get a response with one DATA frame.
3729    fn request_no_body_response_one_chunk() {
3730        let mut s = Session::new().unwrap();
3731        s.handshake().unwrap();
3732
3733        let (stream, req) = s.send_request(true).unwrap();
3734        assert_eq!(stream, 0);
3735
3736        let ev_headers = Event::Headers {
3737            list: req,
3738            more_frames: false,
3739        };
3740
3741        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3742
3743        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3744
3745        let resp = s.send_response(stream, false).unwrap();
3746
3747        let body = s.send_body_server(stream, true).unwrap();
3748
3749        let mut recv_buf = vec![0; body.len()];
3750
3751        let ev_headers = Event::Headers {
3752            list: resp,
3753            more_frames: true,
3754        };
3755
3756        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3757
3758        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3759        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3760
3761        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3762        assert_eq!(s.poll_client(), Err(Error::Done));
3763    }
3764
3765    #[test]
3766    /// Send a request with no body, get a response with multiple DATA frames.
3767    fn request_no_body_response_many_chunks() {
3768        let mut s = Session::new().unwrap();
3769        s.handshake().unwrap();
3770
3771        let (stream, req) = s.send_request(true).unwrap();
3772
3773        let ev_headers = Event::Headers {
3774            list: req,
3775            more_frames: false,
3776        };
3777
3778        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3779        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3780
3781        let total_data_frames = 4;
3782
3783        let resp = s.send_response(stream, false).unwrap();
3784
3785        for _ in 0..total_data_frames - 1 {
3786            s.send_body_server(stream, false).unwrap();
3787        }
3788
3789        let body = s.send_body_server(stream, true).unwrap();
3790
3791        let mut recv_buf = vec![0; body.len()];
3792
3793        let ev_headers = Event::Headers {
3794            list: resp,
3795            more_frames: true,
3796        };
3797
3798        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3799        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3800        assert_eq!(s.poll_client(), Err(Error::Done));
3801
3802        for _ in 0..total_data_frames {
3803            assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
3804        }
3805
3806        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3807        assert_eq!(s.poll_client(), Err(Error::Done));
3808    }
3809
3810    #[test]
3811    /// Send a request with no body, get a response with multiple DATA frames.
3812    fn request_no_body_response_many_chunks_with_buf() {
3813        let (mut config, h3_config) = Session::default_configs().unwrap();
3814        // we don't want to be limited by flow or cong. control
3815        config.set_initial_congestion_window_packets(100);
3816        config.set_initial_max_data(200_000);
3817        config.set_initial_max_stream_data_bidi_local(200_000);
3818        config.set_initial_max_stream_data_bidi_remote(200_000);
3819        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
3820        s.handshake().unwrap();
3821
3822        let (stream, req) = s.send_request(true).unwrap();
3823
3824        let ev_headers = Event::Headers {
3825            list: req,
3826            more_frames: false,
3827        };
3828
3829        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3830        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3831
3832        let total_data_frames = 4;
3833
3834        // Use a large body
3835        let data = vec![0xab_u8; 16 * 1024];
3836
3837        let resp = s.send_response(stream, false).unwrap();
3838
3839        for _ in 0..total_data_frames - 1 {
3840            assert_eq!(
3841                s.server.send_body(&mut s.pipe.server, stream, &data, false),
3842                Ok(data.len())
3843            );
3844            s.advance().ok();
3845        }
3846
3847        s.server
3848            .send_body(&mut s.pipe.server, stream, &data, true)
3849            .unwrap();
3850        s.advance().ok();
3851
3852        let ev_headers = Event::Headers {
3853            list: resp,
3854            more_frames: true,
3855        };
3856
3857        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3858        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
3859        assert_eq!(s.poll_client(), Err(Error::Done));
3860
3861        // We expect to be able to read multiple data frames in a single call and
3862        // reads don't have to end on frame boundaries. So let's try to read
3863        // 1.5 times the amount we sent in one frame.
3864        let how_much_to_read_per_call = data.len() * 2 / 3;
3865        let mut remaining_to_read = total_data_frames * data.len();
3866        let mut recv_buf = Vec::new().limit(how_much_to_read_per_call);
3867        assert_eq!(
3868            s.recv_body_buf_client(stream, &mut recv_buf),
3869            Ok(how_much_to_read_per_call)
3870        );
3871        remaining_to_read -= how_much_to_read_per_call;
3872        assert_eq!(recv_buf.get_ref().len(), how_much_to_read_per_call);
3873
3874        while remaining_to_read > 0 {
3875            // Set a different limit for the following reads.
3876            recv_buf.set_limit(data.len());
3877            // We should either read up to the limit we set above, or to
3878            // the end of buffered data.
3879            let expected = std::cmp::min(data.len(), remaining_to_read);
3880            assert_eq!(
3881                s.recv_body_buf_client(stream, &mut recv_buf),
3882                Ok(expected)
3883            );
3884            remaining_to_read -= expected;
3885        }
3886        // We've read everything now. Ensure the Vec reflects that
3887        assert_eq!(recv_buf.get_ref().len(), total_data_frames * data.len());
3888
3889        // No more data to read.
3890        assert_eq!(
3891            s.recv_body_buf_client(stream, &mut recv_buf),
3892            Err(Error::Done)
3893        );
3894
3895        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3896        assert_eq!(s.poll_client(), Err(Error::Done));
3897    }
3898
3899    #[test]
3900    /// Send a request with one DATA frame, get a response with no body.
3901    fn request_one_chunk_response_no_body() {
3902        let mut s = Session::new().unwrap();
3903        s.handshake().unwrap();
3904
3905        let (stream, req) = s.send_request(false).unwrap();
3906
3907        let body = s.send_body_client(stream, true).unwrap();
3908
3909        let mut recv_buf = vec![0; body.len()];
3910
3911        let ev_headers = Event::Headers {
3912            list: req,
3913            more_frames: true,
3914        };
3915
3916        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3917
3918        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3919        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3920
3921        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3922
3923        let resp = s.send_response(stream, true).unwrap();
3924
3925        let ev_headers = Event::Headers {
3926            list: resp,
3927            more_frames: false,
3928        };
3929
3930        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3931        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3932    }
3933
3934    #[test]
3935    /// Send a request with multiple DATA frames, get a response with no body.
3936    fn request_many_chunks_response_no_body() {
3937        let mut s = Session::new().unwrap();
3938        s.handshake().unwrap();
3939
3940        let (stream, req) = s.send_request(false).unwrap();
3941
3942        let total_data_frames = 4;
3943
3944        for _ in 0..total_data_frames - 1 {
3945            s.send_body_client(stream, false).unwrap();
3946        }
3947
3948        let body = s.send_body_client(stream, true).unwrap();
3949
3950        let mut recv_buf = vec![0; body.len()];
3951
3952        let ev_headers = Event::Headers {
3953            list: req,
3954            more_frames: true,
3955        };
3956
3957        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
3958        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
3959        assert_eq!(s.poll_server(), Err(Error::Done));
3960
3961        for _ in 0..total_data_frames {
3962            assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
3963        }
3964
3965        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
3966
3967        let resp = s.send_response(stream, true).unwrap();
3968
3969        let ev_headers = Event::Headers {
3970            list: resp,
3971            more_frames: false,
3972        };
3973
3974        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
3975        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
3976    }
3977
3978    #[test]
3979    /// Send a request with multiple DATA frames, get a response with one DATA
3980    /// frame.
3981    fn many_requests_many_chunks_response_one_chunk() {
3982        let mut s = Session::new().unwrap();
3983        s.handshake().unwrap();
3984
3985        let mut reqs = Vec::new();
3986
3987        let (stream1, req1) = s.send_request(false).unwrap();
3988        assert_eq!(stream1, 0);
3989        reqs.push(req1);
3990
3991        let (stream2, req2) = s.send_request(false).unwrap();
3992        assert_eq!(stream2, 4);
3993        reqs.push(req2);
3994
3995        let (stream3, req3) = s.send_request(false).unwrap();
3996        assert_eq!(stream3, 8);
3997        reqs.push(req3);
3998
3999        let body = s.send_body_client(stream1, false).unwrap();
4000        s.send_body_client(stream2, false).unwrap();
4001        s.send_body_client(stream3, false).unwrap();
4002
4003        let mut recv_buf = vec![0; body.len()];
4004
4005        // Reverse order of writes.
4006
4007        s.send_body_client(stream3, true).unwrap();
4008        s.send_body_client(stream2, true).unwrap();
4009        s.send_body_client(stream1, true).unwrap();
4010
4011        let (_, ev) = s.poll_server().unwrap();
4012        let ev_headers = Event::Headers {
4013            list: reqs[0].clone(),
4014            more_frames: true,
4015        };
4016        assert_eq!(ev, ev_headers);
4017
4018        let (_, ev) = s.poll_server().unwrap();
4019        let ev_headers = Event::Headers {
4020            list: reqs[1].clone(),
4021            more_frames: true,
4022        };
4023        assert_eq!(ev, ev_headers);
4024
4025        let (_, ev) = s.poll_server().unwrap();
4026        let ev_headers = Event::Headers {
4027            list: reqs[2].clone(),
4028            more_frames: true,
4029        };
4030        assert_eq!(ev, ev_headers);
4031
4032        assert_eq!(s.poll_server(), Ok((0, Event::Data)));
4033        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
4034        assert_eq!(s.poll_client(), Err(Error::Done));
4035        assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
4036        assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
4037
4038        assert_eq!(s.poll_server(), Ok((4, Event::Data)));
4039        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
4040        assert_eq!(s.poll_client(), Err(Error::Done));
4041        assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
4042        assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
4043
4044        assert_eq!(s.poll_server(), Ok((8, Event::Data)));
4045        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
4046        assert_eq!(s.poll_client(), Err(Error::Done));
4047        assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
4048        assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
4049
4050        assert_eq!(s.poll_server(), Err(Error::Done));
4051
4052        let mut resps = Vec::new();
4053
4054        let resp1 = s.send_response(stream1, true).unwrap();
4055        resps.push(resp1);
4056
4057        let resp2 = s.send_response(stream2, true).unwrap();
4058        resps.push(resp2);
4059
4060        let resp3 = s.send_response(stream3, true).unwrap();
4061        resps.push(resp3);
4062
4063        for _ in 0..resps.len() {
4064            let (stream, ev) = s.poll_client().unwrap();
4065            let ev_headers = Event::Headers {
4066                list: resps[(stream / 4) as usize].clone(),
4067                more_frames: false,
4068            };
4069            assert_eq!(ev, ev_headers);
4070            assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4071        }
4072
4073        assert_eq!(s.poll_client(), Err(Error::Done));
4074    }
4075
4076    #[test]
4077    /// Send a request with no body, get a response with one DATA frame and an
4078    /// empty FIN after reception from the client.
4079    fn request_no_body_response_one_chunk_empty_fin() {
4080        let mut s = Session::new().unwrap();
4081        s.handshake().unwrap();
4082
4083        let (stream, req) = s.send_request(true).unwrap();
4084
4085        let ev_headers = Event::Headers {
4086            list: req,
4087            more_frames: false,
4088        };
4089
4090        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4091        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4092
4093        let resp = s.send_response(stream, false).unwrap();
4094
4095        let body = s.send_body_server(stream, false).unwrap();
4096
4097        let mut recv_buf = vec![0; body.len()];
4098
4099        let ev_headers = Event::Headers {
4100            list: resp,
4101            more_frames: true,
4102        };
4103
4104        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4105
4106        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
4107        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
4108
4109        assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
4110        s.advance().ok();
4111
4112        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4113        assert_eq!(s.poll_client(), Err(Error::Done));
4114    }
4115
4116    #[test]
4117    /// Send a request with no body, get a response with no body followed by
4118    /// GREASE that is STREAM frame with a FIN.
4119    fn request_no_body_response_no_body_with_grease() {
4120        let mut s = Session::new().unwrap();
4121        s.handshake().unwrap();
4122
4123        let (stream, req) = s.send_request(true).unwrap();
4124
4125        assert_eq!(stream, 0);
4126
4127        let ev_headers = Event::Headers {
4128            list: req,
4129            more_frames: false,
4130        };
4131
4132        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4133        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4134
4135        let resp = s.send_response(stream, false).unwrap();
4136
4137        let ev_headers = Event::Headers {
4138            list: resp,
4139            more_frames: true,
4140        };
4141
4142        // Inject a GREASE frame
4143        let mut d = [42; 10];
4144        let mut b = octets::OctetsMut::with_slice(&mut d);
4145
4146        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
4147        s.pipe.server.stream_send(0, frame_type, false).unwrap();
4148
4149        let frame_len = b.put_varint(10).unwrap();
4150        s.pipe.server.stream_send(0, frame_len, false).unwrap();
4151
4152        s.pipe.server.stream_send(0, &d, true).unwrap();
4153
4154        s.advance().ok();
4155
4156        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4157        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4158        assert_eq!(s.poll_client(), Err(Error::Done));
4159    }
4160
4161    #[test]
4162    /// Try to send DATA frames before HEADERS.
4163    fn body_response_before_headers() {
4164        let mut s = Session::new().unwrap();
4165        s.handshake().unwrap();
4166
4167        let (stream, req) = s.send_request(true).unwrap();
4168        assert_eq!(stream, 0);
4169
4170        let ev_headers = Event::Headers {
4171            list: req,
4172            more_frames: false,
4173        };
4174
4175        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4176
4177        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4178
4179        assert_eq!(
4180            s.send_body_server(stream, true),
4181            Err(Error::FrameUnexpected)
4182        );
4183
4184        assert_eq!(s.poll_client(), Err(Error::Done));
4185    }
4186
4187    #[test]
4188    /// Try to send DATA frames on wrong streams, ensure the API returns an
4189    /// error before anything hits the transport layer.
4190    fn send_body_invalid_client_stream() {
4191        let mut s = Session::new().unwrap();
4192        s.handshake().unwrap();
4193
4194        assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
4195
4196        assert_eq!(
4197            s.send_body_client(s.client.control_stream_id.unwrap(), true),
4198            Err(Error::FrameUnexpected)
4199        );
4200
4201        assert_eq!(
4202            s.send_body_client(
4203                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
4204                true
4205            ),
4206            Err(Error::FrameUnexpected)
4207        );
4208
4209        assert_eq!(
4210            s.send_body_client(
4211                s.client.local_qpack_streams.decoder_stream_id.unwrap(),
4212                true
4213            ),
4214            Err(Error::FrameUnexpected)
4215        );
4216
4217        assert_eq!(
4218            s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
4219            Err(Error::FrameUnexpected)
4220        );
4221
4222        assert_eq!(
4223            s.send_body_client(
4224                s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
4225                true
4226            ),
4227            Err(Error::FrameUnexpected)
4228        );
4229
4230        assert_eq!(
4231            s.send_body_client(
4232                s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
4233                true
4234            ),
4235            Err(Error::FrameUnexpected)
4236        );
4237    }
4238
4239    #[test]
4240    /// Try to send DATA frames on wrong streams, ensure the API returns an
4241    /// error before anything hits the transport layer.
4242    fn send_body_invalid_server_stream() {
4243        let mut s = Session::new().unwrap();
4244        s.handshake().unwrap();
4245
4246        assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
4247
4248        assert_eq!(
4249            s.send_body_server(s.server.control_stream_id.unwrap(), true),
4250            Err(Error::FrameUnexpected)
4251        );
4252
4253        assert_eq!(
4254            s.send_body_server(
4255                s.server.local_qpack_streams.encoder_stream_id.unwrap(),
4256                true
4257            ),
4258            Err(Error::FrameUnexpected)
4259        );
4260
4261        assert_eq!(
4262            s.send_body_server(
4263                s.server.local_qpack_streams.decoder_stream_id.unwrap(),
4264                true
4265            ),
4266            Err(Error::FrameUnexpected)
4267        );
4268
4269        assert_eq!(
4270            s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
4271            Err(Error::FrameUnexpected)
4272        );
4273
4274        assert_eq!(
4275            s.send_body_server(
4276                s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
4277                true
4278            ),
4279            Err(Error::FrameUnexpected)
4280        );
4281
4282        assert_eq!(
4283            s.send_body_server(
4284                s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
4285                true
4286            ),
4287            Err(Error::FrameUnexpected)
4288        );
4289    }
4290
4291    #[test]
4292    /// Client sends request with body and trailers.
4293    fn trailers() {
4294        let mut s = Session::new().unwrap();
4295        s.handshake().unwrap();
4296
4297        let (stream, req) = s.send_request(false).unwrap();
4298
4299        let body = s.send_body_client(stream, false).unwrap();
4300
4301        let mut recv_buf = vec![0; body.len()];
4302
4303        let req_trailers = vec![Header::new(b"foo", b"bar")];
4304
4305        s.client
4306            .send_additional_headers(
4307                &mut s.pipe.client,
4308                stream,
4309                &req_trailers,
4310                true,
4311                true,
4312            )
4313            .unwrap();
4314
4315        s.advance().ok();
4316
4317        let ev_headers = Event::Headers {
4318            list: req,
4319            more_frames: true,
4320        };
4321
4322        let ev_trailers = Event::Headers {
4323            list: req_trailers,
4324            more_frames: false,
4325        };
4326
4327        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4328
4329        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4330        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4331
4332        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4333        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4334    }
4335
4336    #[test]
4337    /// Server responds with a 103, then a 200 with no body.
4338    fn informational_response() {
4339        let mut s = Session::new().unwrap();
4340        s.handshake().unwrap();
4341
4342        let (stream, req) = s.send_request(true).unwrap();
4343
4344        assert_eq!(stream, 0);
4345
4346        let ev_headers = Event::Headers {
4347            list: req,
4348            more_frames: false,
4349        };
4350
4351        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4352        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4353
4354        let info_resp = vec![
4355            Header::new(b":status", b"103"),
4356            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4357        ];
4358
4359        let resp = vec![
4360            Header::new(b":status", b"200"),
4361            Header::new(b"server", b"quiche-test"),
4362        ];
4363
4364        s.server
4365            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4366            .unwrap();
4367
4368        s.server
4369            .send_additional_headers(
4370                &mut s.pipe.server,
4371                stream,
4372                &resp,
4373                false,
4374                true,
4375            )
4376            .unwrap();
4377
4378        s.advance().ok();
4379
4380        let ev_info_headers = Event::Headers {
4381            list: info_resp,
4382            more_frames: true,
4383        };
4384
4385        let ev_headers = Event::Headers {
4386            list: resp,
4387            more_frames: false,
4388        };
4389
4390        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4391        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
4392        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
4393        assert_eq!(s.poll_client(), Err(Error::Done));
4394    }
4395
4396    #[test]
4397    /// Server responds with a 103, then attempts to send a 200 using
4398    /// send_response again, which should fail.
4399    fn no_multiple_response() {
4400        let mut s = Session::new().unwrap();
4401        s.handshake().unwrap();
4402
4403        let (stream, req) = s.send_request(true).unwrap();
4404
4405        assert_eq!(stream, 0);
4406
4407        let ev_headers = Event::Headers {
4408            list: req,
4409            more_frames: false,
4410        };
4411
4412        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4413        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4414
4415        let info_resp = vec![
4416            Header::new(b":status", b"103"),
4417            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4418        ];
4419
4420        let resp = vec![
4421            Header::new(b":status", b"200"),
4422            Header::new(b"server", b"quiche-test"),
4423        ];
4424
4425        s.server
4426            .send_response(&mut s.pipe.server, stream, &info_resp, false)
4427            .unwrap();
4428
4429        assert_eq!(
4430            Err(Error::FrameUnexpected),
4431            s.server
4432                .send_response(&mut s.pipe.server, stream, &resp, true)
4433        );
4434
4435        s.advance().ok();
4436
4437        let ev_info_headers = Event::Headers {
4438            list: info_resp,
4439            more_frames: true,
4440        };
4441
4442        assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
4443        assert_eq!(s.poll_client(), Err(Error::Done));
4444    }
4445
4446    #[test]
4447    /// Server attempts to use send_additional_headers before initial response.
4448    fn no_send_additional_before_initial_response() {
4449        let mut s = Session::new().unwrap();
4450        s.handshake().unwrap();
4451
4452        let (stream, req) = s.send_request(true).unwrap();
4453
4454        assert_eq!(stream, 0);
4455
4456        let ev_headers = Event::Headers {
4457            list: req,
4458            more_frames: false,
4459        };
4460
4461        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4462        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
4463
4464        let info_resp = vec![
4465            Header::new(b":status", b"103"),
4466            Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
4467        ];
4468
4469        assert_eq!(
4470            Err(Error::FrameUnexpected),
4471            s.server.send_additional_headers(
4472                &mut s.pipe.server,
4473                stream,
4474                &info_resp,
4475                false,
4476                false
4477            )
4478        );
4479
4480        s.advance().ok();
4481
4482        assert_eq!(s.poll_client(), Err(Error::Done));
4483    }
4484
4485    #[test]
4486    /// Client sends multiple HEADERS before data.
4487    fn additional_headers_before_data_client() {
4488        let mut s = Session::new().unwrap();
4489        s.handshake().unwrap();
4490
4491        let (stream, req) = s.send_request(false).unwrap();
4492
4493        let req_trailer = vec![Header::new(b"goodbye", b"world")];
4494
4495        assert_eq!(
4496            s.client.send_additional_headers(
4497                &mut s.pipe.client,
4498                stream,
4499                &req_trailer,
4500                true,
4501                false
4502            ),
4503            Ok(())
4504        );
4505
4506        s.advance().ok();
4507
4508        let ev_initial_headers = Event::Headers {
4509            list: req,
4510            more_frames: true,
4511        };
4512
4513        let ev_trailing_headers = Event::Headers {
4514            list: req_trailer,
4515            more_frames: true,
4516        };
4517
4518        assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
4519        assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
4520        assert_eq!(s.poll_server(), Err(Error::Done));
4521    }
4522
4523    #[test]
4524    /// Client sends multiple HEADERS before data.
4525    fn data_after_trailers_client() {
4526        let mut s = Session::new().unwrap();
4527        s.handshake().unwrap();
4528
4529        let (stream, req) = s.send_request(false).unwrap();
4530
4531        let body = s.send_body_client(stream, false).unwrap();
4532
4533        let mut recv_buf = vec![0; body.len()];
4534
4535        let req_trailers = vec![Header::new(b"foo", b"bar")];
4536
4537        s.client
4538            .send_additional_headers(
4539                &mut s.pipe.client,
4540                stream,
4541                &req_trailers,
4542                true,
4543                false,
4544            )
4545            .unwrap();
4546
4547        s.advance().ok();
4548
4549        s.send_frame_client(
4550            frame::Frame::Data {
4551                payload: vec![1, 2, 3, 4],
4552            },
4553            stream,
4554            true,
4555        )
4556        .unwrap();
4557
4558        let ev_headers = Event::Headers {
4559            list: req,
4560            more_frames: true,
4561        };
4562
4563        let ev_trailers = Event::Headers {
4564            list: req_trailers,
4565            more_frames: true,
4566        };
4567
4568        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4569        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
4570        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
4571        assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
4572        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4573    }
4574
4575    #[test]
4576    /// Send a MAX_PUSH_ID frame from the client on a valid stream.
4577    fn max_push_id_from_client_good() {
4578        let mut s = Session::new().unwrap();
4579        s.handshake().unwrap();
4580
4581        s.send_frame_client(
4582            frame::Frame::MaxPushId { push_id: 1 },
4583            s.client.control_stream_id.unwrap(),
4584            false,
4585        )
4586        .unwrap();
4587
4588        assert_eq!(s.poll_server(), Err(Error::Done));
4589    }
4590
4591    #[test]
4592    /// Send a MAX_PUSH_ID frame from the client on an invalid stream.
4593    fn max_push_id_from_client_bad_stream() {
4594        let mut s = Session::new().unwrap();
4595        s.handshake().unwrap();
4596
4597        let (stream, req) = s.send_request(false).unwrap();
4598
4599        s.send_frame_client(
4600            frame::Frame::MaxPushId { push_id: 2 },
4601            stream,
4602            false,
4603        )
4604        .unwrap();
4605
4606        let ev_headers = Event::Headers {
4607            list: req,
4608            more_frames: true,
4609        };
4610
4611        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4612        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4613    }
4614
4615    #[test]
4616    /// Send a sequence of MAX_PUSH_ID frames from the client that attempt to
4617    /// reduce the limit.
4618    fn max_push_id_from_client_limit_reduction() {
4619        let mut s = Session::new().unwrap();
4620        s.handshake().unwrap();
4621
4622        s.send_frame_client(
4623            frame::Frame::MaxPushId { push_id: 2 },
4624            s.client.control_stream_id.unwrap(),
4625            false,
4626        )
4627        .unwrap();
4628
4629        s.send_frame_client(
4630            frame::Frame::MaxPushId { push_id: 1 },
4631            s.client.control_stream_id.unwrap(),
4632            false,
4633        )
4634        .unwrap();
4635
4636        assert_eq!(s.poll_server(), Err(Error::IdError));
4637    }
4638
4639    #[test]
4640    /// Send a MAX_PUSH_ID frame from the server, which is forbidden.
4641    fn max_push_id_from_server() {
4642        let mut s = Session::new().unwrap();
4643        s.handshake().unwrap();
4644
4645        s.send_frame_server(
4646            frame::Frame::MaxPushId { push_id: 1 },
4647            s.server.control_stream_id.unwrap(),
4648            false,
4649        )
4650        .unwrap();
4651
4652        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
4653    }
4654
4655    #[test]
4656    /// Send a PUSH_PROMISE frame from the client, which is forbidden.
4657    fn push_promise_from_client() {
4658        let mut s = Session::new().unwrap();
4659        s.handshake().unwrap();
4660
4661        let (stream, req) = s.send_request(false).unwrap();
4662
4663        let header_block = s.client.encode_header_block(&req).unwrap();
4664
4665        s.send_frame_client(
4666            frame::Frame::PushPromise {
4667                push_id: 1,
4668                header_block,
4669            },
4670            stream,
4671            false,
4672        )
4673        .unwrap();
4674
4675        let ev_headers = Event::Headers {
4676            list: req,
4677            more_frames: true,
4678        };
4679
4680        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4681        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4682    }
4683
4684    #[test]
4685    /// Send a CANCEL_PUSH frame from the client.
4686    fn cancel_push_from_client() {
4687        let mut s = Session::new().unwrap();
4688        s.handshake().unwrap();
4689
4690        s.send_frame_client(
4691            frame::Frame::CancelPush { push_id: 1 },
4692            s.client.control_stream_id.unwrap(),
4693            false,
4694        )
4695        .unwrap();
4696
4697        assert_eq!(s.poll_server(), Err(Error::Done));
4698    }
4699
4700    #[test]
4701    /// Send a CANCEL_PUSH frame from the client on an invalid stream.
4702    fn cancel_push_from_client_bad_stream() {
4703        let mut s = Session::new().unwrap();
4704        s.handshake().unwrap();
4705
4706        let (stream, req) = s.send_request(false).unwrap();
4707
4708        s.send_frame_client(
4709            frame::Frame::CancelPush { push_id: 2 },
4710            stream,
4711            false,
4712        )
4713        .unwrap();
4714
4715        let ev_headers = Event::Headers {
4716            list: req,
4717            more_frames: true,
4718        };
4719
4720        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
4721        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
4722    }
4723
4724    #[test]
4725    /// Send a CANCEL_PUSH frame from the client.
4726    fn cancel_push_from_server() {
4727        let mut s = Session::new().unwrap();
4728        s.handshake().unwrap();
4729
4730        s.send_frame_server(
4731            frame::Frame::CancelPush { push_id: 1 },
4732            s.server.control_stream_id.unwrap(),
4733            false,
4734        )
4735        .unwrap();
4736
4737        assert_eq!(s.poll_client(), Err(Error::Done));
4738    }
4739
4740    #[test]
4741    /// Send a GOAWAY frame from the client.
4742    fn goaway_from_client_good() {
4743        let mut s = Session::new().unwrap();
4744        s.handshake().unwrap();
4745
4746        s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
4747
4748        s.advance().ok();
4749
4750        // TODO: server push
4751        assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
4752    }
4753
4754    #[test]
4755    /// Send a GOAWAY frame from the server.
4756    fn goaway_from_server_good() {
4757        let mut s = Session::new().unwrap();
4758        s.handshake().unwrap();
4759
4760        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4761
4762        s.advance().ok();
4763
4764        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4765    }
4766
4767    #[test]
4768    /// A client MUST NOT send a request after it receives GOAWAY.
4769    fn client_request_after_goaway() {
4770        let mut s = Session::new().unwrap();
4771        s.handshake().unwrap();
4772
4773        s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
4774
4775        s.advance().ok();
4776
4777        assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
4778
4779        assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
4780    }
4781
4782    #[test]
4783    /// Send a GOAWAY frame from the server, using an invalid goaway ID.
4784    fn goaway_from_server_invalid_id() {
4785        let mut s = Session::new().unwrap();
4786        s.handshake().unwrap();
4787
4788        s.send_frame_server(
4789            frame::Frame::GoAway { id: 1 },
4790            s.server.control_stream_id.unwrap(),
4791            false,
4792        )
4793        .unwrap();
4794
4795        assert_eq!(s.poll_client(), Err(Error::IdError));
4796    }
4797
4798    #[test]
4799    /// Send multiple GOAWAY frames from the server, that increase the goaway
4800    /// ID.
4801    fn goaway_from_server_increase_id() {
4802        let mut s = Session::new().unwrap();
4803        s.handshake().unwrap();
4804
4805        s.send_frame_server(
4806            frame::Frame::GoAway { id: 0 },
4807            s.server.control_stream_id.unwrap(),
4808            false,
4809        )
4810        .unwrap();
4811
4812        s.send_frame_server(
4813            frame::Frame::GoAway { id: 4 },
4814            s.server.control_stream_id.unwrap(),
4815            false,
4816        )
4817        .unwrap();
4818
4819        assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
4820
4821        assert_eq!(s.poll_client(), Err(Error::IdError));
4822    }
4823
4824    #[test]
4825    #[cfg(feature = "sfv")]
4826    fn parse_priority_field_value() {
4827        // Legal dicts
4828        assert_eq!(
4829            Ok(Priority::new(0, false)),
4830            Priority::try_from(b"u=0".as_slice())
4831        );
4832        assert_eq!(
4833            Ok(Priority::new(3, false)),
4834            Priority::try_from(b"u=3".as_slice())
4835        );
4836        assert_eq!(
4837            Ok(Priority::new(7, false)),
4838            Priority::try_from(b"u=7".as_slice())
4839        );
4840
4841        assert_eq!(
4842            Ok(Priority::new(0, true)),
4843            Priority::try_from(b"u=0, i".as_slice())
4844        );
4845        assert_eq!(
4846            Ok(Priority::new(3, true)),
4847            Priority::try_from(b"u=3, i".as_slice())
4848        );
4849        assert_eq!(
4850            Ok(Priority::new(7, true)),
4851            Priority::try_from(b"u=7, i".as_slice())
4852        );
4853
4854        assert_eq!(
4855            Ok(Priority::new(0, true)),
4856            Priority::try_from(b"u=0, i=?1".as_slice())
4857        );
4858        assert_eq!(
4859            Ok(Priority::new(3, true)),
4860            Priority::try_from(b"u=3, i=?1".as_slice())
4861        );
4862        assert_eq!(
4863            Ok(Priority::new(7, true)),
4864            Priority::try_from(b"u=7, i=?1".as_slice())
4865        );
4866
4867        assert_eq!(
4868            Ok(Priority::new(3, false)),
4869            Priority::try_from(b"".as_slice())
4870        );
4871
4872        assert_eq!(
4873            Ok(Priority::new(0, true)),
4874            Priority::try_from(b"u=0;foo, i;bar".as_slice())
4875        );
4876        assert_eq!(
4877            Ok(Priority::new(3, true)),
4878            Priority::try_from(b"u=3;hello, i;world".as_slice())
4879        );
4880        assert_eq!(
4881            Ok(Priority::new(7, true)),
4882            Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
4883        );
4884
4885        assert_eq!(
4886            Ok(Priority::new(0, true)),
4887            Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
4888        );
4889
4890        // Illegal formats
4891        assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
4892        assert_eq!(
4893            Ok(Priority::new(7, false)),
4894            Priority::try_from(b"u=-1".as_slice())
4895        );
4896        assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
4897        assert_eq!(
4898            Ok(Priority::new(7, false)),
4899            Priority::try_from(b"u=100".as_slice())
4900        );
4901        assert_eq!(
4902            Err(Error::Done),
4903            Priority::try_from(b"u=3, i=true".as_slice())
4904        );
4905
4906        // Trailing comma in dict is malformed
4907        assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
4908    }
4909
4910    #[test]
4911    /// Send a PRIORITY_UPDATE for request stream from the client.
4912    fn priority_update_request() {
4913        let mut s = Session::new().unwrap();
4914        s.handshake().unwrap();
4915
4916        s.client
4917            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4918                urgency: 3,
4919                incremental: false,
4920            })
4921            .unwrap();
4922        s.advance().ok();
4923
4924        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4925        assert_eq!(s.poll_server(), Err(Error::Done));
4926    }
4927
4928    #[test]
4929    /// Send a PRIORITY_UPDATE for request stream from the client.
4930    fn priority_update_single_stream_rearm() {
4931        let mut s = Session::new().unwrap();
4932        s.handshake().unwrap();
4933
4934        s.client
4935            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4936                urgency: 3,
4937                incremental: false,
4938            })
4939            .unwrap();
4940        s.advance().ok();
4941
4942        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4943        assert_eq!(s.poll_server(), Err(Error::Done));
4944
4945        s.client
4946            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4947                urgency: 5,
4948                incremental: false,
4949            })
4950            .unwrap();
4951        s.advance().ok();
4952
4953        assert_eq!(s.poll_server(), Err(Error::Done));
4954
4955        // There is only one PRIORITY_UPDATE frame to read. Once read, the event
4956        // will rearm ready for more.
4957        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
4958        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4959
4960        s.client
4961            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4962                urgency: 7,
4963                incremental: false,
4964            })
4965            .unwrap();
4966        s.advance().ok();
4967
4968        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4969        assert_eq!(s.poll_server(), Err(Error::Done));
4970
4971        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
4972        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
4973    }
4974
4975    #[test]
4976    /// Send multiple PRIORITY_UPDATE frames for different streams from the
4977    /// client across multiple flights of exchange.
4978    fn priority_update_request_multiple_stream_arm_multiple_flights() {
4979        let mut s = Session::new().unwrap();
4980        s.handshake().unwrap();
4981
4982        s.client
4983            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
4984                urgency: 3,
4985                incremental: false,
4986            })
4987            .unwrap();
4988        s.advance().ok();
4989
4990        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
4991        assert_eq!(s.poll_server(), Err(Error::Done));
4992
4993        s.client
4994            .send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
4995                urgency: 1,
4996                incremental: false,
4997            })
4998            .unwrap();
4999        s.advance().ok();
5000
5001        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
5002        assert_eq!(s.poll_server(), Err(Error::Done));
5003
5004        s.client
5005            .send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
5006                urgency: 2,
5007                incremental: false,
5008            })
5009            .unwrap();
5010        s.advance().ok();
5011
5012        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
5013        assert_eq!(s.poll_server(), Err(Error::Done));
5014
5015        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5016        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
5017        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
5018        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5019    }
5020
5021    #[test]
5022    /// Send multiple PRIORITY_UPDATE frames for different streams from the
5023    /// client across a single flight.
5024    fn priority_update_request_multiple_stream_arm_single_flight() {
5025        let mut s = Session::new().unwrap();
5026        s.handshake().unwrap();
5027
5028        let mut d = [42; 65535];
5029
5030        let mut b = octets::OctetsMut::with_slice(&mut d);
5031
5032        let p1 = frame::Frame::PriorityUpdateRequest {
5033            prioritized_element_id: 0,
5034            priority_field_value: b"u=3".to_vec(),
5035        };
5036
5037        let p2 = frame::Frame::PriorityUpdateRequest {
5038            prioritized_element_id: 4,
5039            priority_field_value: b"u=3".to_vec(),
5040        };
5041
5042        let p3 = frame::Frame::PriorityUpdateRequest {
5043            prioritized_element_id: 8,
5044            priority_field_value: b"u=3".to_vec(),
5045        };
5046
5047        p1.to_bytes(&mut b).unwrap();
5048        p2.to_bytes(&mut b).unwrap();
5049        p3.to_bytes(&mut b).unwrap();
5050
5051        let off = b.off();
5052        s.pipe
5053            .client
5054            .stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
5055            .unwrap();
5056
5057        s.advance().ok();
5058
5059        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5060        assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
5061        assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
5062        assert_eq!(s.poll_server(), Err(Error::Done));
5063
5064        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5065        assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
5066        assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
5067
5068        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5069    }
5070
5071    #[test]
5072    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
5073    /// has been completed.
5074    fn priority_update_request_collected_completed() {
5075        let mut s = Session::new().unwrap();
5076        s.handshake().unwrap();
5077
5078        s.client
5079            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5080                urgency: 3,
5081                incremental: false,
5082            })
5083            .unwrap();
5084        s.advance().ok();
5085
5086        let (stream, req) = s.send_request(true).unwrap();
5087        let ev_headers = Event::Headers {
5088            list: req,
5089            more_frames: false,
5090        };
5091
5092        // Priority event is generated before request headers.
5093        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5094        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5095        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5096        assert_eq!(s.poll_server(), Err(Error::Done));
5097
5098        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5099        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5100
5101        let resp = s.send_response(stream, true).unwrap();
5102
5103        let ev_headers = Event::Headers {
5104            list: resp,
5105            more_frames: false,
5106        };
5107
5108        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
5109        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
5110        assert_eq!(s.poll_client(), Err(Error::Done));
5111
5112        // Now send a PRIORITY_UPDATE for the completed request stream.
5113        s.client
5114            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5115                urgency: 3,
5116                incremental: false,
5117            })
5118            .unwrap();
5119        s.advance().ok();
5120
5121        // No event generated at server
5122        assert_eq!(s.poll_server(), Err(Error::Done));
5123    }
5124
5125    #[test]
5126    /// Send a PRIORITY_UPDATE for a request stream, before and after the stream
5127    /// has been stopped.
5128    fn priority_update_request_collected_stopped() {
5129        let mut s = Session::new().unwrap();
5130        s.handshake().unwrap();
5131
5132        s.client
5133            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5134                urgency: 3,
5135                incremental: false,
5136            })
5137            .unwrap();
5138        s.advance().ok();
5139
5140        let (stream, req) = s.send_request(false).unwrap();
5141        let ev_headers = Event::Headers {
5142            list: req,
5143            more_frames: true,
5144        };
5145
5146        // Priority event is generated before request headers.
5147        assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
5148        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5149        assert_eq!(s.poll_server(), Err(Error::Done));
5150
5151        assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
5152        assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
5153
5154        s.pipe
5155            .client
5156            .stream_shutdown(stream, crate::Shutdown::Write, 0x100)
5157            .unwrap();
5158        s.pipe
5159            .client
5160            .stream_shutdown(stream, crate::Shutdown::Read, 0x100)
5161            .unwrap();
5162
5163        s.advance().ok();
5164
5165        assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
5166        assert_eq!(s.poll_server(), Err(Error::Done));
5167
5168        // Now send a PRIORITY_UPDATE for the closed request stream.
5169        s.client
5170            .send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
5171                urgency: 3,
5172                incremental: false,
5173            })
5174            .unwrap();
5175        s.advance().ok();
5176
5177        // No event generated at server
5178        assert_eq!(s.poll_server(), Err(Error::Done));
5179
5180        assert!(s.pipe.server.streams.is_collected(0));
5181        assert!(s.pipe.client.streams.is_collected(0));
5182    }
5183
5184    #[test]
5185    /// Send a PRIORITY_UPDATE for push stream from the client.
5186    fn priority_update_push() {
5187        let mut s = Session::new().unwrap();
5188        s.handshake().unwrap();
5189
5190        s.send_frame_client(
5191            frame::Frame::PriorityUpdatePush {
5192                prioritized_element_id: 3,
5193                priority_field_value: b"u=3".to_vec(),
5194            },
5195            s.client.control_stream_id.unwrap(),
5196            false,
5197        )
5198        .unwrap();
5199
5200        assert_eq!(s.poll_server(), Err(Error::Done));
5201    }
5202
5203    #[test]
5204    /// Send a PRIORITY_UPDATE for request stream from the client but for an
5205    /// incorrect stream type.
5206    fn priority_update_request_bad_stream() {
5207        let mut s = Session::new().unwrap();
5208        s.handshake().unwrap();
5209
5210        s.send_frame_client(
5211            frame::Frame::PriorityUpdateRequest {
5212                prioritized_element_id: 5,
5213                priority_field_value: b"u=3".to_vec(),
5214            },
5215            s.client.control_stream_id.unwrap(),
5216            false,
5217        )
5218        .unwrap();
5219
5220        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5221    }
5222
5223    #[test]
5224    /// Send a PRIORITY_UPDATE for push stream from the client but for an
5225    /// incorrect stream type.
5226    fn priority_update_push_bad_stream() {
5227        let mut s = Session::new().unwrap();
5228        s.handshake().unwrap();
5229
5230        s.send_frame_client(
5231            frame::Frame::PriorityUpdatePush {
5232                prioritized_element_id: 5,
5233                priority_field_value: b"u=3".to_vec(),
5234            },
5235            s.client.control_stream_id.unwrap(),
5236            false,
5237        )
5238        .unwrap();
5239
5240        assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
5241    }
5242
5243    #[test]
5244    /// Send a PRIORITY_UPDATE for request stream from the server.
5245    fn priority_update_request_from_server() {
5246        let mut s = Session::new().unwrap();
5247        s.handshake().unwrap();
5248
5249        s.send_frame_server(
5250            frame::Frame::PriorityUpdateRequest {
5251                prioritized_element_id: 0,
5252                priority_field_value: b"u=3".to_vec(),
5253            },
5254            s.server.control_stream_id.unwrap(),
5255            false,
5256        )
5257        .unwrap();
5258
5259        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5260    }
5261
5262    #[test]
5263    /// Send a PRIORITY_UPDATE for request stream from the server.
5264    fn priority_update_push_from_server() {
5265        let mut s = Session::new().unwrap();
5266        s.handshake().unwrap();
5267
5268        s.send_frame_server(
5269            frame::Frame::PriorityUpdatePush {
5270                prioritized_element_id: 0,
5271                priority_field_value: b"u=3".to_vec(),
5272            },
5273            s.server.control_stream_id.unwrap(),
5274            false,
5275        )
5276        .unwrap();
5277
5278        assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
5279    }
5280
5281    #[test]
5282    /// Ensure quiche allocates streams for client and server roles as expected.
5283    fn uni_stream_local_counting() {
5284        let config = Config::new().unwrap();
5285
5286        let h3_cln = Connection::new(&config, false, false).unwrap();
5287        assert_eq!(h3_cln.next_uni_stream_id, 2);
5288
5289        let h3_srv = Connection::new(&config, true, false).unwrap();
5290        assert_eq!(h3_srv.next_uni_stream_id, 3);
5291    }
5292
5293    #[test]
5294    /// Client opens multiple control streams, which is forbidden.
5295    fn open_multiple_control_streams() {
5296        let mut s = Session::new().unwrap();
5297        s.handshake().unwrap();
5298
5299        let stream_id = s.client.next_uni_stream_id;
5300
5301        let mut d = [42; 8];
5302        let mut b = octets::OctetsMut::with_slice(&mut d);
5303
5304        s.pipe
5305            .client
5306            .stream_send(
5307                stream_id,
5308                b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
5309                false,
5310            )
5311            .unwrap();
5312
5313        s.advance().ok();
5314
5315        assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
5316    }
5317
5318    #[test]
5319    /// Client closes the control stream, which is forbidden.
5320    fn close_control_stream_after_type() {
5321        let mut s = Session::new().unwrap();
5322        s.handshake().unwrap();
5323
5324        s.pipe
5325            .client
5326            .stream_send(s.client.control_stream_id.unwrap(), &[], true)
5327            .unwrap();
5328
5329        s.advance().ok();
5330
5331        assert_eq!(
5332            Err(Error::ClosedCriticalStream),
5333            s.server.poll(&mut s.pipe.server)
5334        );
5335        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5336    }
5337
5338    #[test]
5339    /// Client closes the control stream after a frame is sent, which is
5340    /// forbidden.
5341    fn close_control_stream_after_frame() {
5342        let mut s = Session::new().unwrap();
5343        s.handshake().unwrap();
5344
5345        s.send_frame_client(
5346            frame::Frame::MaxPushId { push_id: 1 },
5347            s.client.control_stream_id.unwrap(),
5348            true,
5349        )
5350        .unwrap();
5351
5352        assert_eq!(
5353            Err(Error::ClosedCriticalStream),
5354            s.server.poll(&mut s.pipe.server)
5355        );
5356        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5357    }
5358
5359    #[test]
5360    /// Client resets the control stream, which is forbidden.
5361    fn reset_control_stream_after_type() {
5362        let mut s = Session::new().unwrap();
5363        s.handshake().unwrap();
5364
5365        s.pipe
5366            .client
5367            .stream_shutdown(
5368                s.client.control_stream_id.unwrap(),
5369                crate::Shutdown::Write,
5370                0,
5371            )
5372            .unwrap();
5373
5374        s.advance().ok();
5375
5376        assert_eq!(
5377            Err(Error::ClosedCriticalStream),
5378            s.server.poll(&mut s.pipe.server)
5379        );
5380        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5381    }
5382
5383    #[test]
5384    /// Client resets the control stream after a frame is sent, which is
5385    /// forbidden.
5386    fn reset_control_stream_after_frame() {
5387        let mut s = Session::new().unwrap();
5388        s.handshake().unwrap();
5389
5390        s.send_frame_client(
5391            frame::Frame::MaxPushId { push_id: 1 },
5392            s.client.control_stream_id.unwrap(),
5393            false,
5394        )
5395        .unwrap();
5396
5397        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5398
5399        s.pipe
5400            .client
5401            .stream_shutdown(
5402                s.client.control_stream_id.unwrap(),
5403                crate::Shutdown::Write,
5404                0,
5405            )
5406            .unwrap();
5407
5408        s.advance().ok();
5409
5410        assert_eq!(
5411            Err(Error::ClosedCriticalStream),
5412            s.server.poll(&mut s.pipe.server)
5413        );
5414        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5415    }
5416
5417    #[test]
5418    /// Client closes QPACK stream, which is forbidden.
5419    fn close_qpack_stream_after_type() {
5420        let mut s = Session::new().unwrap();
5421        s.handshake().unwrap();
5422
5423        s.pipe
5424            .client
5425            .stream_send(
5426                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5427                &[],
5428                true,
5429            )
5430            .unwrap();
5431
5432        s.advance().ok();
5433
5434        assert_eq!(
5435            Err(Error::ClosedCriticalStream),
5436            s.server.poll(&mut s.pipe.server)
5437        );
5438        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5439    }
5440
5441    #[test]
5442    /// Client closes QPACK stream after sending some stuff, which is forbidden.
5443    fn close_qpack_stream_after_data() {
5444        let mut s = Session::new().unwrap();
5445        s.handshake().unwrap();
5446
5447        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5448        let d = [0; 1];
5449
5450        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5451        s.pipe.client.stream_send(stream_id, &d, true).unwrap();
5452
5453        s.advance().ok();
5454
5455        assert_eq!(
5456            Err(Error::ClosedCriticalStream),
5457            s.server.poll(&mut s.pipe.server)
5458        );
5459        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5460    }
5461
5462    #[test]
5463    /// Client resets QPACK stream, which is forbidden.
5464    fn reset_qpack_stream_after_type() {
5465        let mut s = Session::new().unwrap();
5466        s.handshake().unwrap();
5467
5468        s.pipe
5469            .client
5470            .stream_shutdown(
5471                s.client.local_qpack_streams.encoder_stream_id.unwrap(),
5472                crate::Shutdown::Write,
5473                0,
5474            )
5475            .unwrap();
5476
5477        s.advance().ok();
5478
5479        assert_eq!(
5480            Err(Error::ClosedCriticalStream),
5481            s.server.poll(&mut s.pipe.server)
5482        );
5483        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5484    }
5485
5486    #[test]
5487    /// Client resets QPACK stream after sending some stuff, which is forbidden.
5488    fn reset_qpack_stream_after_data() {
5489        let mut s = Session::new().unwrap();
5490        s.handshake().unwrap();
5491
5492        let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5493        let d = [0; 1];
5494
5495        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5496        s.pipe.client.stream_send(stream_id, &d, false).unwrap();
5497
5498        s.advance().ok();
5499
5500        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5501
5502        s.pipe
5503            .client
5504            .stream_shutdown(stream_id, crate::Shutdown::Write, 0)
5505            .unwrap();
5506
5507        s.advance().ok();
5508
5509        assert_eq!(
5510            Err(Error::ClosedCriticalStream),
5511            s.server.poll(&mut s.pipe.server)
5512        );
5513        assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
5514    }
5515
5516    #[test]
5517    /// Client sends QPACK data.
5518    fn qpack_data() {
5519        // TODO: QPACK instructions are ignored until dynamic table support is
5520        // added so we just test that the data is safely ignored.
5521        let mut s = Session::new().unwrap();
5522        s.handshake().unwrap();
5523
5524        let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
5525        let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
5526        let d = [0; 20];
5527
5528        s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
5529        s.advance().ok();
5530
5531        s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
5532        s.advance().ok();
5533
5534        match s.server.poll(&mut s.pipe.server) {
5535            Ok(_) => panic!(),
5536
5537            Err(Error::Done) => {
5538                assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
5539                assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
5540            },
5541
5542            Err(_) => {
5543                panic!();
5544            },
5545        }
5546
5547        let stats = s.server.stats();
5548        assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
5549        assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
5550    }
5551
5552    #[test]
5553    /// Tests limits for the stream state buffer maximum size.
5554    fn max_state_buf_size() {
5555        let mut s = Session::new().unwrap();
5556        s.handshake().unwrap();
5557
5558        let req = vec![
5559            Header::new(b":method", b"GET"),
5560            Header::new(b":scheme", b"https"),
5561            Header::new(b":authority", b"quic.tech"),
5562            Header::new(b":path", b"/test"),
5563            Header::new(b"user-agent", b"quiche-test"),
5564        ];
5565
5566        assert_eq!(
5567            s.client.send_request(&mut s.pipe.client, &req, false),
5568            Ok(0)
5569        );
5570
5571        s.advance().ok();
5572
5573        let ev_headers = Event::Headers {
5574            list: req,
5575            more_frames: true,
5576        };
5577
5578        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
5579
5580        // DATA frames don't consume the state buffer, so can be of any size.
5581        let mut d = [42; 128];
5582        let mut b = octets::OctetsMut::with_slice(&mut d);
5583
5584        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5585        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5586
5587        let frame_len = b.put_varint(1 << 24).unwrap();
5588        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5589
5590        s.pipe.client.stream_send(0, &d, false).unwrap();
5591
5592        s.advance().ok();
5593
5594        assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
5595
5596        // GREASE frames consume the state buffer, so need to be limited.
5597        let mut s = Session::new().unwrap();
5598        s.handshake().unwrap();
5599
5600        let mut d = [42; 128];
5601        let mut b = octets::OctetsMut::with_slice(&mut d);
5602
5603        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5604        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5605
5606        let frame_len = b.put_varint(1 << 24).unwrap();
5607        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5608
5609        s.pipe.client.stream_send(0, &d, false).unwrap();
5610
5611        s.advance().ok();
5612
5613        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5614    }
5615
5616    #[test]
5617    /// Tests that DATA frames are properly truncated depending on the request
5618    /// stream's outgoing flow control capacity.
5619    fn stream_backpressure() {
5620        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
5621
5622        let mut s = Session::new().unwrap();
5623        s.handshake().unwrap();
5624
5625        let (stream, req) = s.send_request(false).unwrap();
5626
5627        let total_data_frames = 6;
5628
5629        for _ in 0..total_data_frames {
5630            assert_eq!(
5631                s.client
5632                    .send_body(&mut s.pipe.client, stream, &bytes, false),
5633                Ok(bytes.len())
5634            );
5635
5636            s.advance().ok();
5637        }
5638
5639        assert_eq!(
5640            s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
5641            Ok(bytes.len() - 2)
5642        );
5643
5644        s.advance().ok();
5645
5646        let mut recv_buf = vec![0; bytes.len()];
5647
5648        let ev_headers = Event::Headers {
5649            list: req,
5650            more_frames: true,
5651        };
5652
5653        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5654        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
5655        assert_eq!(s.poll_server(), Err(Error::Done));
5656
5657        for _ in 0..total_data_frames {
5658            assert_eq!(
5659                s.recv_body_server(stream, &mut recv_buf),
5660                Ok(bytes.len())
5661            );
5662        }
5663
5664        assert_eq!(
5665            s.recv_body_server(stream, &mut recv_buf),
5666            Ok(bytes.len() - 2)
5667        );
5668
5669        // Fin flag from last send_body() call was not sent as the buffer was
5670        // only partially written.
5671        assert_eq!(s.poll_server(), Err(Error::Done));
5672
5673        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5674        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5675        assert_eq!(s.pipe.server.data_blocked_recv_count, 0);
5676        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 1);
5677
5678        assert_eq!(s.pipe.client.data_blocked_sent_count, 0);
5679        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 1);
5680        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5681        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5682    }
5683
5684    #[test]
5685    /// Tests that the max header list size setting is enforced.
5686    fn request_max_header_size_limit() {
5687        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5688        config
5689            .load_cert_chain_from_pem_file("examples/cert.crt")
5690            .unwrap();
5691        config
5692            .load_priv_key_from_pem_file("examples/cert.key")
5693            .unwrap();
5694        config.set_application_protos(&[b"h3"]).unwrap();
5695        config.set_initial_max_data(1500);
5696        config.set_initial_max_stream_data_bidi_local(150);
5697        config.set_initial_max_stream_data_bidi_remote(150);
5698        config.set_initial_max_stream_data_uni(150);
5699        config.set_initial_max_streams_bidi(5);
5700        config.set_initial_max_streams_uni(5);
5701        config.verify_peer(false);
5702
5703        let mut h3_config = Config::new().unwrap();
5704        h3_config.set_max_field_section_size(65);
5705
5706        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5707
5708        s.handshake().unwrap();
5709
5710        let req = vec![
5711            Header::new(b":method", b"GET"),
5712            Header::new(b":scheme", b"https"),
5713            Header::new(b":authority", b"quic.tech"),
5714            Header::new(b":path", b"/test"),
5715            Header::new(b"aaaaaaa", b"aaaaaaaa"),
5716        ];
5717
5718        let stream = s
5719            .client
5720            .send_request(&mut s.pipe.client, &req, true)
5721            .unwrap();
5722
5723        s.advance().ok();
5724
5725        assert_eq!(stream, 0);
5726
5727        assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
5728
5729        assert_eq!(
5730            s.pipe.server.local_error.as_ref().unwrap().error_code,
5731            Error::to_wire(Error::ExcessiveLoad)
5732        );
5733    }
5734
5735    #[test]
5736    /// Tests that Error::TransportError contains a transport error.
5737    fn transport_error() {
5738        let mut s = Session::new().unwrap();
5739        s.handshake().unwrap();
5740
5741        let req = vec![
5742            Header::new(b":method", b"GET"),
5743            Header::new(b":scheme", b"https"),
5744            Header::new(b":authority", b"quic.tech"),
5745            Header::new(b":path", b"/test"),
5746            Header::new(b"user-agent", b"quiche-test"),
5747        ];
5748
5749        // We need to open all streams in the same flight, so we can't use the
5750        // Session::send_request() method because it also calls advance(),
5751        // otherwise the server would send a MAX_STREAMS frame and the client
5752        // wouldn't hit the streams limit.
5753        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5754        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5755        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
5756        assert_eq!(
5757            s.client.send_request(&mut s.pipe.client, &req, true),
5758            Ok(12)
5759        );
5760        assert_eq!(
5761            s.client.send_request(&mut s.pipe.client, &req, true),
5762            Ok(16)
5763        );
5764
5765        assert_eq!(
5766            s.client.send_request(&mut s.pipe.client, &req, true),
5767            Err(Error::TransportError(crate::Error::StreamLimit))
5768        );
5769    }
5770
5771    #[test]
5772    /// Tests that sending DATA before HEADERS causes an error.
5773    fn data_before_headers() {
5774        let mut s = Session::new().unwrap();
5775        s.handshake().unwrap();
5776
5777        let mut d = [42; 128];
5778        let mut b = octets::OctetsMut::with_slice(&mut d);
5779
5780        let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
5781        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5782
5783        let frame_len = b.put_varint(5).unwrap();
5784        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5785
5786        s.pipe.client.stream_send(0, b"hello", false).unwrap();
5787
5788        s.advance().ok();
5789
5790        assert_eq!(
5791            s.server.poll(&mut s.pipe.server),
5792            Err(Error::FrameUnexpected)
5793        );
5794    }
5795
5796    #[test]
5797    /// Tests that calling poll() after an error occurred does nothing.
5798    fn poll_after_error() {
5799        let mut s = Session::new().unwrap();
5800        s.handshake().unwrap();
5801
5802        let mut d = [42; 128];
5803        let mut b = octets::OctetsMut::with_slice(&mut d);
5804
5805        let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
5806        s.pipe.client.stream_send(0, frame_type, false).unwrap();
5807
5808        let frame_len = b.put_varint(1 << 24).unwrap();
5809        s.pipe.client.stream_send(0, frame_len, false).unwrap();
5810
5811        s.pipe.client.stream_send(0, &d, false).unwrap();
5812
5813        s.advance().ok();
5814
5815        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
5816
5817        // Try to call poll() again after an error occurred.
5818        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
5819    }
5820
5821    #[test]
5822    /// Tests that we limit sending HEADERS based on the stream capacity.
5823    fn headers_blocked() {
5824        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5825        config
5826            .load_cert_chain_from_pem_file("examples/cert.crt")
5827            .unwrap();
5828        config
5829            .load_priv_key_from_pem_file("examples/cert.key")
5830            .unwrap();
5831        config.set_application_protos(&[b"h3"]).unwrap();
5832        config.set_initial_max_data(70);
5833        config.set_initial_max_stream_data_bidi_local(150);
5834        config.set_initial_max_stream_data_bidi_remote(150);
5835        config.set_initial_max_stream_data_uni(150);
5836        config.set_initial_max_streams_bidi(100);
5837        config.set_initial_max_streams_uni(5);
5838        config.verify_peer(false);
5839
5840        let h3_config = Config::new().unwrap();
5841
5842        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5843
5844        s.handshake().unwrap();
5845
5846        let req = vec![
5847            Header::new(b":method", b"GET"),
5848            Header::new(b":scheme", b"https"),
5849            Header::new(b":authority", b"quic.tech"),
5850            Header::new(b":path", b"/test"),
5851        ];
5852
5853        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5854
5855        assert_eq!(
5856            s.client.send_request(&mut s.pipe.client, &req, true),
5857            Err(Error::StreamBlocked)
5858        );
5859
5860        // Clear the writable stream queue.
5861        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5862        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5863        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
5864        assert_eq!(s.pipe.client.stream_writable_next(), None);
5865
5866        s.advance().ok();
5867
5868        // Once the server gives flow control credits back, we can send the
5869        // request.
5870        assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
5871        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
5872
5873        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5874        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5875        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5876        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5877
5878        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5879        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5880        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5881        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5882    }
5883
5884    #[test]
5885    /// Ensure StreamBlocked when connection flow control prevents headers.
5886    fn headers_blocked_on_conn() {
5887        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5888        config
5889            .load_cert_chain_from_pem_file("examples/cert.crt")
5890            .unwrap();
5891        config
5892            .load_priv_key_from_pem_file("examples/cert.key")
5893            .unwrap();
5894        config.set_application_protos(&[b"h3"]).unwrap();
5895        config.set_initial_max_data(70);
5896        config.set_initial_max_stream_data_bidi_local(150);
5897        config.set_initial_max_stream_data_bidi_remote(150);
5898        config.set_initial_max_stream_data_uni(150);
5899        config.set_initial_max_streams_bidi(100);
5900        config.set_initial_max_streams_uni(5);
5901        config.verify_peer(false);
5902
5903        let h3_config = Config::new().unwrap();
5904
5905        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5906
5907        s.handshake().unwrap();
5908
5909        // After the HTTP handshake, some bytes of connection flow control have
5910        // been consumed. Fill the connection with more grease data on the control
5911        // stream.
5912        let d = [42; 28];
5913        assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
5914
5915        let req = vec![
5916            Header::new(b":method", b"GET"),
5917            Header::new(b":scheme", b"https"),
5918            Header::new(b":authority", b"quic.tech"),
5919            Header::new(b":path", b"/test"),
5920        ];
5921
5922        // There is 0 connection-level flow control, so sending a request is
5923        // blocked.
5924        assert_eq!(
5925            s.client.send_request(&mut s.pipe.client, &req, true),
5926            Err(Error::StreamBlocked)
5927        );
5928        assert_eq!(s.pipe.client.stream_writable_next(), None);
5929
5930        // Emit the control stream data and drain it at the server via poll() to
5931        // consumes it via poll() and gives back flow control.
5932        s.advance().ok();
5933        assert_eq!(s.poll_server(), Err(Error::Done));
5934        s.advance().ok();
5935
5936        // Now we can send the request.
5937        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
5938        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
5939        assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
5940
5941        assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
5942        assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
5943        assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
5944        assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
5945
5946        assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
5947        assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
5948        assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
5949        assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
5950    }
5951
5952    #[test]
5953    /// Ensure STREAM_DATA_BLOCKED is not emitted multiple times with the same
5954    /// offset when trying to send large bodies.
5955    fn send_body_truncation_stream_blocked() {
5956        use crate::test_utils::decode_pkt;
5957
5958        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
5959        config
5960            .load_cert_chain_from_pem_file("examples/cert.crt")
5961            .unwrap();
5962        config
5963            .load_priv_key_from_pem_file("examples/cert.key")
5964            .unwrap();
5965        config.set_application_protos(&[b"h3"]).unwrap();
5966        config.set_initial_max_data(10000); // large connection-level flow control
5967        config.set_initial_max_stream_data_bidi_local(80);
5968        config.set_initial_max_stream_data_bidi_remote(80);
5969        config.set_initial_max_stream_data_uni(150);
5970        config.set_initial_max_streams_bidi(100);
5971        config.set_initial_max_streams_uni(5);
5972        config.verify_peer(false);
5973
5974        let h3_config = Config::new().unwrap();
5975
5976        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
5977
5978        s.handshake().unwrap();
5979
5980        let (stream, req) = s.send_request(true).unwrap();
5981
5982        let ev_headers = Event::Headers {
5983            list: req,
5984            more_frames: false,
5985        };
5986
5987        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
5988        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
5989
5990        let _ = s.send_response(stream, false).unwrap();
5991
5992        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
5993
5994        // The body must be larger than the stream window would allow
5995        let d = [42; 500];
5996        let mut off = 0;
5997
5998        let sent = s
5999            .server
6000            .send_body(&mut s.pipe.server, stream, &d, true)
6001            .unwrap();
6002        assert_eq!(sent, 25);
6003        off += sent;
6004
6005        // send_body wrote as much as it could (sent < size of buff).
6006        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6007        assert_eq!(
6008            s.server
6009                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6010            Err(Error::Done)
6011        );
6012        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6013
6014        // Now read raw frames to see what the QUIC layer did
6015        let mut buf = [0; 65535];
6016        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
6017
6018        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
6019
6020        let mut iter = frames.iter();
6021
6022        assert_eq!(
6023            iter.next(),
6024            Some(&crate::frame::Frame::StreamDataBlocked {
6025                stream_id: 0,
6026                limit: 80,
6027            })
6028        );
6029
6030        // At the server, after sending the STREAM_DATA_BLOCKED frame, we clear
6031        // the mark.
6032        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
6033
6034        // Don't read any data from the client, so stream flow control is never
6035        // given back in the form of changing the stream's max offset.
6036        // Subsequent body send operations will still fail but no more
6037        // STREAM_DATA_BLOCKED frames should be submitted since the limit didn't
6038        // change. No frames means no packet to send.
6039        assert_eq!(
6040            s.server
6041                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6042            Err(Error::Done)
6043        );
6044        assert_eq!(s.pipe.server.streams.blocked().len(), 0);
6045        assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
6046
6047        // Now update the client's max offset manually.
6048        let frames = [crate::frame::Frame::MaxStreamData {
6049            stream_id: 0,
6050            max: 100,
6051        }];
6052
6053        let pkt_type = crate::packet::Type::Short;
6054        assert_eq!(
6055            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
6056            Ok(39),
6057        );
6058
6059        let sent = s
6060            .server
6061            .send_body(&mut s.pipe.server, stream, &d[off..], true)
6062            .unwrap();
6063        assert_eq!(sent, 18);
6064
6065        // Same thing here...
6066        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6067        assert_eq!(
6068            s.server
6069                .send_body(&mut s.pipe.server, stream, &d[off..], true),
6070            Err(Error::Done)
6071        );
6072        assert_eq!(s.pipe.server.streams.blocked().len(), 1);
6073
6074        let (len, _) = s.pipe.server.send(&mut buf).unwrap();
6075
6076        let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
6077
6078        let mut iter = frames.iter();
6079
6080        assert_eq!(
6081            iter.next(),
6082            Some(&crate::frame::Frame::StreamDataBlocked {
6083                stream_id: 0,
6084                limit: 100,
6085            })
6086        );
6087    }
6088
6089    #[test]
6090    /// Ensure stream doesn't hang due to small cwnd.
6091    fn send_body_stream_blocked_by_small_cwnd() {
6092        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6093        config
6094            .load_cert_chain_from_pem_file("examples/cert.crt")
6095            .unwrap();
6096        config
6097            .load_priv_key_from_pem_file("examples/cert.key")
6098            .unwrap();
6099        config.set_application_protos(&[b"h3"]).unwrap();
6100        config.set_initial_max_data(100000); // large connection-level flow control
6101        config.set_initial_max_stream_data_bidi_local(100000);
6102        config.set_initial_max_stream_data_bidi_remote(50000);
6103        config.set_initial_max_stream_data_uni(150);
6104        config.set_initial_max_streams_bidi(100);
6105        config.set_initial_max_streams_uni(5);
6106        config.verify_peer(false);
6107
6108        let h3_config = Config::new().unwrap();
6109
6110        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6111
6112        s.handshake().unwrap();
6113
6114        let (stream, req) = s.send_request(true).unwrap();
6115
6116        let ev_headers = Event::Headers {
6117            list: req,
6118            more_frames: false,
6119        };
6120
6121        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6122        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6123
6124        let _ = s.send_response(stream, false).unwrap();
6125
6126        // Clear the writable stream queue.
6127        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6128        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6129        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6130        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6131        assert_eq!(s.pipe.server.stream_writable_next(), None);
6132
6133        // The body must be larger than the cwnd would allow.
6134        let send_buf = [42; 80000];
6135
6136        let sent = s
6137            .server
6138            .send_body(&mut s.pipe.server, stream, &send_buf, true)
6139            .unwrap();
6140
6141        // send_body wrote as much as it could (sent < size of buff).
6142        assert_eq!(sent, 11995);
6143
6144        s.advance().ok();
6145
6146        // Client reads received headers and body.
6147        let mut recv_buf = [42; 80000];
6148        assert!(s.poll_client().is_ok());
6149        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6150        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
6151
6152        s.advance().ok();
6153
6154        // Server send cap is smaller than remaining body buffer.
6155        assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
6156
6157        // Once the server cwnd opens up, we can send more body.
6158        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6159    }
6160
6161    #[test]
6162    /// Ensure stream doesn't hang due to small cwnd.
6163    fn send_body_stream_blocked_zero_length() {
6164        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6165        config
6166            .load_cert_chain_from_pem_file("examples/cert.crt")
6167            .unwrap();
6168        config
6169            .load_priv_key_from_pem_file("examples/cert.key")
6170            .unwrap();
6171        config.set_application_protos(&[b"h3"]).unwrap();
6172        config.set_initial_max_data(100000); // large connection-level flow control
6173        config.set_initial_max_stream_data_bidi_local(100000);
6174        config.set_initial_max_stream_data_bidi_remote(50000);
6175        config.set_initial_max_stream_data_uni(150);
6176        config.set_initial_max_streams_bidi(100);
6177        config.set_initial_max_streams_uni(5);
6178        config.verify_peer(false);
6179
6180        let h3_config = Config::new().unwrap();
6181
6182        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6183
6184        s.handshake().unwrap();
6185
6186        let (stream, req) = s.send_request(true).unwrap();
6187
6188        let ev_headers = Event::Headers {
6189            list: req,
6190            more_frames: false,
6191        };
6192
6193        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6194        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6195
6196        let _ = s.send_response(stream, false).unwrap();
6197
6198        // Clear the writable stream queue.
6199        assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
6200        assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
6201        assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
6202        assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
6203        assert_eq!(s.pipe.server.stream_writable_next(), None);
6204
6205        // The body is large enough to fill the cwnd, except for enough bytes
6206        // for another DATA frame header (but no payload).
6207        let send_buf = [42; 11994];
6208
6209        let sent = s
6210            .server
6211            .send_body(&mut s.pipe.server, stream, &send_buf, false)
6212            .unwrap();
6213
6214        assert_eq!(sent, 11994);
6215
6216        // There is only enough capacity left for the DATA frame header, but
6217        // no payload.
6218        assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
6219        assert_eq!(
6220            s.server
6221                .send_body(&mut s.pipe.server, stream, &send_buf, false),
6222            Err(Error::Done)
6223        );
6224
6225        s.advance().ok();
6226
6227        // Client reads received headers and body.
6228        let mut recv_buf = [42; 80000];
6229        assert!(s.poll_client().is_ok());
6230        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6231        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
6232
6233        s.advance().ok();
6234
6235        // Once the server cwnd opens up, we can send more body.
6236        assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
6237    }
6238
6239    #[test]
6240    /// Test handling of 0-length DATA writes with and without fin.
6241    fn zero_length_data() {
6242        let mut s = Session::new().unwrap();
6243        s.handshake().unwrap();
6244
6245        let (stream, req) = s.send_request(false).unwrap();
6246
6247        assert_eq!(
6248            s.client.send_body(&mut s.pipe.client, 0, b"", false),
6249            Err(Error::Done)
6250        );
6251        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6252
6253        s.advance().ok();
6254
6255        let mut recv_buf = vec![0; 100];
6256
6257        let ev_headers = Event::Headers {
6258            list: req,
6259            more_frames: true,
6260        };
6261
6262        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6263
6264        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6265        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
6266
6267        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6268        assert_eq!(s.poll_server(), Err(Error::Done));
6269
6270        let resp = s.send_response(stream, false).unwrap();
6271
6272        assert_eq!(
6273            s.server.send_body(&mut s.pipe.server, 0, b"", false),
6274            Err(Error::Done)
6275        );
6276        assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
6277
6278        s.advance().ok();
6279
6280        let ev_headers = Event::Headers {
6281            list: resp,
6282            more_frames: true,
6283        };
6284
6285        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6286
6287        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6288        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
6289
6290        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6291        assert_eq!(s.poll_client(), Err(Error::Done));
6292    }
6293
6294    #[test]
6295    /// Tests that blocked 0-length DATA writes are reported correctly.
6296    fn zero_length_data_blocked() {
6297        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6298        config
6299            .load_cert_chain_from_pem_file("examples/cert.crt")
6300            .unwrap();
6301        config
6302            .load_priv_key_from_pem_file("examples/cert.key")
6303            .unwrap();
6304        config.set_application_protos(&[b"h3"]).unwrap();
6305        config.set_initial_max_data(69);
6306        config.set_initial_max_stream_data_bidi_local(150);
6307        config.set_initial_max_stream_data_bidi_remote(150);
6308        config.set_initial_max_stream_data_uni(150);
6309        config.set_initial_max_streams_bidi(100);
6310        config.set_initial_max_streams_uni(5);
6311        config.verify_peer(false);
6312
6313        let h3_config = Config::new().unwrap();
6314
6315        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6316
6317        s.handshake().unwrap();
6318
6319        let req = vec![
6320            Header::new(b":method", b"GET"),
6321            Header::new(b":scheme", b"https"),
6322            Header::new(b":authority", b"quic.tech"),
6323            Header::new(b":path", b"/test"),
6324        ];
6325
6326        assert_eq!(
6327            s.client.send_request(&mut s.pipe.client, &req, false),
6328            Ok(0)
6329        );
6330
6331        assert_eq!(
6332            s.client.send_body(&mut s.pipe.client, 0, b"", true),
6333            Err(Error::Done)
6334        );
6335
6336        // Clear the writable stream queue.
6337        assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
6338        assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
6339        assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
6340        assert_eq!(s.pipe.client.stream_writable_next(), None);
6341
6342        s.advance().ok();
6343
6344        // Once the server gives flow control credits back, we can send the body.
6345        assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
6346        assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
6347    }
6348
6349    #[test]
6350    /// Tests that receiving an empty SETTINGS frame is handled and reported.
6351    fn empty_settings() {
6352        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6353        config
6354            .load_cert_chain_from_pem_file("examples/cert.crt")
6355            .unwrap();
6356        config
6357            .load_priv_key_from_pem_file("examples/cert.key")
6358            .unwrap();
6359        config.set_application_protos(&[b"h3"]).unwrap();
6360        config.set_initial_max_data(1500);
6361        config.set_initial_max_stream_data_bidi_local(150);
6362        config.set_initial_max_stream_data_bidi_remote(150);
6363        config.set_initial_max_stream_data_uni(150);
6364        config.set_initial_max_streams_bidi(5);
6365        config.set_initial_max_streams_uni(5);
6366        config.verify_peer(false);
6367        config.set_ack_delay_exponent(8);
6368        config.grease(false);
6369
6370        let h3_config = Config::new().unwrap();
6371        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6372
6373        s.handshake().unwrap();
6374
6375        assert!(s.client.peer_settings_raw().is_some());
6376        assert!(s.server.peer_settings_raw().is_some());
6377    }
6378
6379    #[test]
6380    /// Tests that receiving a H3_DATAGRAM setting is ok.
6381    fn dgram_setting() {
6382        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6383        config
6384            .load_cert_chain_from_pem_file("examples/cert.crt")
6385            .unwrap();
6386        config
6387            .load_priv_key_from_pem_file("examples/cert.key")
6388            .unwrap();
6389        config.set_application_protos(&[b"h3"]).unwrap();
6390        config.set_initial_max_data(70);
6391        config.set_initial_max_stream_data_bidi_local(150);
6392        config.set_initial_max_stream_data_bidi_remote(150);
6393        config.set_initial_max_stream_data_uni(150);
6394        config.set_initial_max_streams_bidi(100);
6395        config.set_initial_max_streams_uni(5);
6396        config.enable_dgram(true, 1000, 1000);
6397        config.verify_peer(false);
6398
6399        let h3_config = Config::new().unwrap();
6400
6401        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6402        assert_eq!(s.pipe.handshake(), Ok(()));
6403
6404        s.client.send_settings(&mut s.pipe.client).unwrap();
6405        assert_eq!(s.pipe.advance(), Ok(()));
6406
6407        // Before processing SETTINGS (via poll), HTTP/3 DATAGRAMS are not
6408        // enabled.
6409        assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
6410
6411        // When everything is ok, poll returns Done and DATAGRAM is enabled.
6412        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6413        assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
6414
6415        // Now detect things on the client
6416        s.server.send_settings(&mut s.pipe.server).unwrap();
6417        assert_eq!(s.pipe.advance(), Ok(()));
6418        assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
6419        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6420        assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
6421    }
6422
6423    #[test]
6424    /// Tests that receiving a H3_DATAGRAM setting when no TP is set generates
6425    /// an error.
6426    fn dgram_setting_no_tp() {
6427        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6428        config
6429            .load_cert_chain_from_pem_file("examples/cert.crt")
6430            .unwrap();
6431        config
6432            .load_priv_key_from_pem_file("examples/cert.key")
6433            .unwrap();
6434        config.set_application_protos(&[b"h3"]).unwrap();
6435        config.set_initial_max_data(70);
6436        config.set_initial_max_stream_data_bidi_local(150);
6437        config.set_initial_max_stream_data_bidi_remote(150);
6438        config.set_initial_max_stream_data_uni(150);
6439        config.set_initial_max_streams_bidi(100);
6440        config.set_initial_max_streams_uni(5);
6441        config.verify_peer(false);
6442
6443        let h3_config = Config::new().unwrap();
6444
6445        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6446        assert_eq!(s.pipe.handshake(), Ok(()));
6447
6448        s.client.control_stream_id = Some(
6449            s.client
6450                .open_uni_stream(
6451                    &mut s.pipe.client,
6452                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6453                )
6454                .unwrap(),
6455        );
6456
6457        let settings = frame::Frame::Settings {
6458            max_field_section_size: None,
6459            qpack_max_table_capacity: None,
6460            qpack_blocked_streams: None,
6461            connect_protocol_enabled: None,
6462            h3_datagram: Some(1),
6463            grease: None,
6464            additional_settings: Default::default(),
6465            raw: Default::default(),
6466        };
6467
6468        s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
6469            .unwrap();
6470
6471        assert_eq!(s.pipe.advance(), Ok(()));
6472
6473        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6474    }
6475
6476    #[test]
6477    /// Tests that receiving SETTINGS with prohibited values generates an error.
6478    fn settings_h2_prohibited() {
6479        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6480        config
6481            .load_cert_chain_from_pem_file("examples/cert.crt")
6482            .unwrap();
6483        config
6484            .load_priv_key_from_pem_file("examples/cert.key")
6485            .unwrap();
6486        config.set_application_protos(&[b"h3"]).unwrap();
6487        config.set_initial_max_data(70);
6488        config.set_initial_max_stream_data_bidi_local(150);
6489        config.set_initial_max_stream_data_bidi_remote(150);
6490        config.set_initial_max_stream_data_uni(150);
6491        config.set_initial_max_streams_bidi(100);
6492        config.set_initial_max_streams_uni(5);
6493        config.verify_peer(false);
6494
6495        let h3_config = Config::new().unwrap();
6496
6497        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6498        assert_eq!(s.pipe.handshake(), Ok(()));
6499
6500        s.client.control_stream_id = Some(
6501            s.client
6502                .open_uni_stream(
6503                    &mut s.pipe.client,
6504                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6505                )
6506                .unwrap(),
6507        );
6508
6509        s.server.control_stream_id = Some(
6510            s.server
6511                .open_uni_stream(
6512                    &mut s.pipe.server,
6513                    stream::HTTP3_CONTROL_STREAM_TYPE_ID,
6514                )
6515                .unwrap(),
6516        );
6517
6518        let frame_payload_len = 2u64;
6519        let settings = [
6520            frame::SETTINGS_FRAME_TYPE_ID as u8,
6521            frame_payload_len as u8,
6522            0x2, // 0x2 is a reserved setting type
6523            1,
6524        ];
6525
6526        s.send_arbitrary_stream_data_client(
6527            &settings,
6528            s.client.control_stream_id.unwrap(),
6529            false,
6530        )
6531        .unwrap();
6532
6533        s.send_arbitrary_stream_data_server(
6534            &settings,
6535            s.server.control_stream_id.unwrap(),
6536            false,
6537        )
6538        .unwrap();
6539
6540        assert_eq!(s.pipe.advance(), Ok(()));
6541
6542        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
6543
6544        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
6545    }
6546
6547    #[test]
6548    /// Tests that setting SETTINGS with prohibited values generates an error.
6549    fn set_prohibited_additional_settings() {
6550        let mut h3_config = Config::new().unwrap();
6551        assert_eq!(
6552            h3_config.set_additional_settings(vec![(
6553                frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
6554                43
6555            )]),
6556            Err(Error::SettingsError)
6557        );
6558        assert_eq!(
6559            h3_config.set_additional_settings(vec![(
6560                frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
6561                43
6562            )]),
6563            Err(Error::SettingsError)
6564        );
6565        assert_eq!(
6566            h3_config.set_additional_settings(vec![(
6567                frame::SETTINGS_QPACK_BLOCKED_STREAMS,
6568                43
6569            )]),
6570            Err(Error::SettingsError)
6571        );
6572        assert_eq!(
6573            h3_config.set_additional_settings(vec![(
6574                frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
6575                43
6576            )]),
6577            Err(Error::SettingsError)
6578        );
6579        assert_eq!(
6580            h3_config
6581                .set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
6582            Err(Error::SettingsError)
6583        );
6584    }
6585
6586    #[test]
6587    /// Tests additional settings are actually exchanged by the peers.
6588    fn set_additional_settings() {
6589        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6590        config
6591            .load_cert_chain_from_pem_file("examples/cert.crt")
6592            .unwrap();
6593        config
6594            .load_priv_key_from_pem_file("examples/cert.key")
6595            .unwrap();
6596        config.set_application_protos(&[b"h3"]).unwrap();
6597        config.set_initial_max_data(70);
6598        config.set_initial_max_stream_data_bidi_local(150);
6599        config.set_initial_max_stream_data_bidi_remote(150);
6600        config.set_initial_max_stream_data_uni(150);
6601        config.set_initial_max_streams_bidi(100);
6602        config.set_initial_max_streams_uni(5);
6603        config.verify_peer(false);
6604        config.grease(false);
6605
6606        let mut h3_config = Config::new().unwrap();
6607        h3_config
6608            .set_additional_settings(vec![(42, 43), (44, 45)])
6609            .unwrap();
6610
6611        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6612        assert_eq!(s.pipe.handshake(), Ok(()));
6613
6614        assert_eq!(s.pipe.advance(), Ok(()));
6615
6616        s.client.send_settings(&mut s.pipe.client).unwrap();
6617        assert_eq!(s.pipe.advance(), Ok(()));
6618        assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
6619
6620        s.server.send_settings(&mut s.pipe.server).unwrap();
6621        assert_eq!(s.pipe.advance(), Ok(()));
6622        assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
6623
6624        assert_eq!(
6625            s.server.peer_settings_raw(),
6626            Some(&[(42, 43), (44, 45)][..])
6627        );
6628        assert_eq!(
6629            s.client.peer_settings_raw(),
6630            Some(&[(42, 43), (44, 45)][..])
6631        );
6632    }
6633
6634    #[test]
6635    /// Send a single DATAGRAM.
6636    fn single_dgram() {
6637        let mut buf = [0; 65535];
6638        let mut s = Session::new().unwrap();
6639        s.handshake().unwrap();
6640
6641        // We'll send default data of 10 bytes on flow ID 0.
6642        let result = (11, 0, 1);
6643
6644        s.send_dgram_client(0).unwrap();
6645
6646        assert_eq!(s.poll_server(), Err(Error::Done));
6647        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6648
6649        s.send_dgram_server(0).unwrap();
6650        assert_eq!(s.poll_client(), Err(Error::Done));
6651        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6652    }
6653
6654    #[test]
6655    /// Send multiple DATAGRAMs.
6656    fn multiple_dgram() {
6657        let mut buf = [0; 65535];
6658        let mut s = Session::new().unwrap();
6659        s.handshake().unwrap();
6660
6661        // We'll send default data of 10 bytes on flow ID 0.
6662        let result = (11, 0, 1);
6663
6664        s.send_dgram_client(0).unwrap();
6665        s.send_dgram_client(0).unwrap();
6666        s.send_dgram_client(0).unwrap();
6667
6668        assert_eq!(s.poll_server(), Err(Error::Done));
6669        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6670        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6671        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6672        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6673
6674        s.send_dgram_server(0).unwrap();
6675        s.send_dgram_server(0).unwrap();
6676        s.send_dgram_server(0).unwrap();
6677
6678        assert_eq!(s.poll_client(), Err(Error::Done));
6679        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6680        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6681        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6682        assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
6683    }
6684
6685    #[test]
6686    /// Send more DATAGRAMs than the send queue allows.
6687    fn multiple_dgram_overflow() {
6688        let mut buf = [0; 65535];
6689        let mut s = Session::new().unwrap();
6690        s.handshake().unwrap();
6691
6692        // We'll send default data of 10 bytes on flow ID 0.
6693        let result = (11, 0, 1);
6694
6695        // Five DATAGRAMs
6696        s.send_dgram_client(0).unwrap();
6697        s.send_dgram_client(0).unwrap();
6698        s.send_dgram_client(0).unwrap();
6699        s.send_dgram_client(0).unwrap();
6700        s.send_dgram_client(0).unwrap();
6701
6702        // Only 3 independent DATAGRAMs to read events will fire.
6703        assert_eq!(s.poll_server(), Err(Error::Done));
6704        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6705        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6706        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6707        assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
6708    }
6709
6710    #[test]
6711    /// Send a single DATAGRAM and request.
6712    fn poll_datagram_cycling_no_read() {
6713        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6714        config
6715            .load_cert_chain_from_pem_file("examples/cert.crt")
6716            .unwrap();
6717        config
6718            .load_priv_key_from_pem_file("examples/cert.key")
6719            .unwrap();
6720        config.set_application_protos(&[b"h3"]).unwrap();
6721        config.set_initial_max_data(1500);
6722        config.set_initial_max_stream_data_bidi_local(150);
6723        config.set_initial_max_stream_data_bidi_remote(150);
6724        config.set_initial_max_stream_data_uni(150);
6725        config.set_initial_max_streams_bidi(100);
6726        config.set_initial_max_streams_uni(5);
6727        config.verify_peer(false);
6728        config.enable_dgram(true, 100, 100);
6729
6730        let h3_config = Config::new().unwrap();
6731        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6732        s.handshake().unwrap();
6733
6734        // Send request followed by DATAGRAM on client side.
6735        let (stream, req) = s.send_request(false).unwrap();
6736
6737        s.send_body_client(stream, true).unwrap();
6738
6739        let ev_headers = Event::Headers {
6740            list: req,
6741            more_frames: true,
6742        };
6743
6744        s.send_dgram_client(0).unwrap();
6745
6746        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6747        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6748
6749        assert_eq!(s.poll_server(), Err(Error::Done));
6750    }
6751
6752    #[test]
6753    /// Send a single DATAGRAM and request.
6754    fn poll_datagram_single_read() {
6755        let mut buf = [0; 65535];
6756
6757        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6758        config
6759            .load_cert_chain_from_pem_file("examples/cert.crt")
6760            .unwrap();
6761        config
6762            .load_priv_key_from_pem_file("examples/cert.key")
6763            .unwrap();
6764        config.set_application_protos(&[b"h3"]).unwrap();
6765        config.set_initial_max_data(1500);
6766        config.set_initial_max_stream_data_bidi_local(150);
6767        config.set_initial_max_stream_data_bidi_remote(150);
6768        config.set_initial_max_stream_data_uni(150);
6769        config.set_initial_max_streams_bidi(100);
6770        config.set_initial_max_streams_uni(5);
6771        config.verify_peer(false);
6772        config.enable_dgram(true, 100, 100);
6773
6774        let h3_config = Config::new().unwrap();
6775        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6776        s.handshake().unwrap();
6777
6778        // We'll send default data of 10 bytes on flow ID 0.
6779        let result = (11, 0, 1);
6780
6781        // Send request followed by DATAGRAM on client side.
6782        let (stream, req) = s.send_request(false).unwrap();
6783
6784        let body = s.send_body_client(stream, true).unwrap();
6785
6786        let mut recv_buf = vec![0; body.len()];
6787
6788        let ev_headers = Event::Headers {
6789            list: req,
6790            more_frames: true,
6791        };
6792
6793        s.send_dgram_client(0).unwrap();
6794
6795        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6796        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6797
6798        assert_eq!(s.poll_server(), Err(Error::Done));
6799
6800        assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
6801
6802        assert_eq!(s.poll_server(), Err(Error::Done));
6803
6804        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6805        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6806        assert_eq!(s.poll_server(), Err(Error::Done));
6807
6808        // Send response followed by DATAGRAM on server side
6809        let resp = s.send_response(stream, false).unwrap();
6810
6811        let body = s.send_body_server(stream, true).unwrap();
6812
6813        let mut recv_buf = vec![0; body.len()];
6814
6815        let ev_headers = Event::Headers {
6816            list: resp,
6817            more_frames: true,
6818        };
6819
6820        s.send_dgram_server(0).unwrap();
6821
6822        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6823        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6824
6825        assert_eq!(s.poll_client(), Err(Error::Done));
6826
6827        assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
6828
6829        assert_eq!(s.poll_client(), Err(Error::Done));
6830
6831        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6832
6833        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6834        assert_eq!(s.poll_client(), Err(Error::Done));
6835    }
6836
6837    #[test]
6838    /// Send multiple DATAGRAMs and requests.
6839    fn poll_datagram_multi_read() {
6840        let mut buf = [0; 65535];
6841
6842        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
6843        config
6844            .load_cert_chain_from_pem_file("examples/cert.crt")
6845            .unwrap();
6846        config
6847            .load_priv_key_from_pem_file("examples/cert.key")
6848            .unwrap();
6849        config.set_application_protos(&[b"h3"]).unwrap();
6850        config.set_initial_max_data(1500);
6851        config.set_initial_max_stream_data_bidi_local(150);
6852        config.set_initial_max_stream_data_bidi_remote(150);
6853        config.set_initial_max_stream_data_uni(150);
6854        config.set_initial_max_streams_bidi(100);
6855        config.set_initial_max_streams_uni(5);
6856        config.verify_peer(false);
6857        config.enable_dgram(true, 100, 100);
6858
6859        let h3_config = Config::new().unwrap();
6860        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
6861        s.handshake().unwrap();
6862
6863        // 10 bytes on flow ID 0 and 2.
6864        let flow_0_result = (11, 0, 1);
6865        let flow_2_result = (11, 2, 1);
6866
6867        // Send requests followed by DATAGRAMs on client side.
6868        let (stream, req) = s.send_request(false).unwrap();
6869
6870        let body = s.send_body_client(stream, true).unwrap();
6871
6872        let mut recv_buf = vec![0; body.len()];
6873
6874        let ev_headers = Event::Headers {
6875            list: req,
6876            more_frames: true,
6877        };
6878
6879        s.send_dgram_client(0).unwrap();
6880        s.send_dgram_client(0).unwrap();
6881        s.send_dgram_client(0).unwrap();
6882        s.send_dgram_client(0).unwrap();
6883        s.send_dgram_client(0).unwrap();
6884        s.send_dgram_client(2).unwrap();
6885        s.send_dgram_client(2).unwrap();
6886        s.send_dgram_client(2).unwrap();
6887        s.send_dgram_client(2).unwrap();
6888        s.send_dgram_client(2).unwrap();
6889
6890        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
6891        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
6892
6893        assert_eq!(s.poll_server(), Err(Error::Done));
6894
6895        // Second cycle, start to read
6896        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6897        assert_eq!(s.poll_server(), Err(Error::Done));
6898        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6899        assert_eq!(s.poll_server(), Err(Error::Done));
6900        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6901        assert_eq!(s.poll_server(), Err(Error::Done));
6902
6903        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
6904        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
6905
6906        assert_eq!(s.poll_server(), Err(Error::Done));
6907
6908        // Third cycle.
6909        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6910        assert_eq!(s.poll_server(), Err(Error::Done));
6911        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
6912        assert_eq!(s.poll_server(), Err(Error::Done));
6913        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6914        assert_eq!(s.poll_server(), Err(Error::Done));
6915        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6916        assert_eq!(s.poll_server(), Err(Error::Done));
6917        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6918        assert_eq!(s.poll_server(), Err(Error::Done));
6919        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6920        assert_eq!(s.poll_server(), Err(Error::Done));
6921        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
6922        assert_eq!(s.poll_server(), Err(Error::Done));
6923
6924        // Send response followed by DATAGRAM on server side
6925        let resp = s.send_response(stream, false).unwrap();
6926
6927        let body = s.send_body_server(stream, true).unwrap();
6928
6929        let mut recv_buf = vec![0; body.len()];
6930
6931        let ev_headers = Event::Headers {
6932            list: resp,
6933            more_frames: true,
6934        };
6935
6936        s.send_dgram_server(0).unwrap();
6937        s.send_dgram_server(0).unwrap();
6938        s.send_dgram_server(0).unwrap();
6939        s.send_dgram_server(0).unwrap();
6940        s.send_dgram_server(0).unwrap();
6941        s.send_dgram_server(2).unwrap();
6942        s.send_dgram_server(2).unwrap();
6943        s.send_dgram_server(2).unwrap();
6944        s.send_dgram_server(2).unwrap();
6945        s.send_dgram_server(2).unwrap();
6946
6947        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
6948        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
6949
6950        assert_eq!(s.poll_client(), Err(Error::Done));
6951
6952        // Second cycle, start to read
6953        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6954        assert_eq!(s.poll_client(), Err(Error::Done));
6955        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6956        assert_eq!(s.poll_client(), Err(Error::Done));
6957        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6958        assert_eq!(s.poll_client(), Err(Error::Done));
6959
6960        assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
6961        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
6962
6963        assert_eq!(s.poll_client(), Err(Error::Done));
6964
6965        // Third cycle.
6966        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6967        assert_eq!(s.poll_client(), Err(Error::Done));
6968        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
6969        assert_eq!(s.poll_client(), Err(Error::Done));
6970        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6971        assert_eq!(s.poll_client(), Err(Error::Done));
6972        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6973        assert_eq!(s.poll_client(), Err(Error::Done));
6974        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6975        assert_eq!(s.poll_client(), Err(Error::Done));
6976        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6977        assert_eq!(s.poll_client(), Err(Error::Done));
6978        assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
6979        assert_eq!(s.poll_client(), Err(Error::Done));
6980    }
6981
6982    #[test]
6983    /// Tests that the Finished event is not issued for streams of unknown type
6984    /// (e.g. GREASE).
6985    fn finished_is_for_requests() {
6986        let mut s = Session::new().unwrap();
6987        s.handshake().unwrap();
6988
6989        assert_eq!(s.poll_client(), Err(Error::Done));
6990        assert_eq!(s.poll_server(), Err(Error::Done));
6991
6992        assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
6993        assert_eq!(s.pipe.advance(), Ok(()));
6994
6995        assert_eq!(s.poll_client(), Err(Error::Done));
6996        assert_eq!(s.poll_server(), Err(Error::Done));
6997    }
6998
6999    #[test]
7000    /// Tests that streams are marked as finished only once.
7001    fn finished_once() {
7002        let mut s = Session::new().unwrap();
7003        s.handshake().unwrap();
7004
7005        let (stream, req) = s.send_request(false).unwrap();
7006        let body = s.send_body_client(stream, true).unwrap();
7007
7008        let mut recv_buf = vec![0; body.len()];
7009
7010        let ev_headers = Event::Headers {
7011            list: req,
7012            more_frames: true,
7013        };
7014
7015        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7016        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7017
7018        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7019        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7020
7021        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
7022        assert_eq!(s.poll_server(), Err(Error::Done));
7023    }
7024
7025    #[test]
7026    /// Tests that the Data event is properly re-armed.
7027    fn data_event_rearm() {
7028        let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
7029
7030        let mut s = Session::new().unwrap();
7031        s.handshake().unwrap();
7032
7033        let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
7034
7035        let mut recv_buf = vec![0; bytes.len()];
7036
7037        let r1_ev_headers = Event::Headers {
7038            list: r1_hdrs,
7039            more_frames: true,
7040        };
7041
7042        // Manually send an incomplete DATA frame (i.e. the frame size is longer
7043        // than the actual data sent).
7044        {
7045            let mut d = [42; 10];
7046            let mut b = octets::OctetsMut::with_slice(&mut d);
7047
7048            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7049            b.put_varint(bytes.len() as u64).unwrap();
7050            let off = b.off();
7051            s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
7052
7053            assert_eq!(
7054                s.pipe.client.stream_send(r1_id, &bytes[..5], false),
7055                Ok(5)
7056            );
7057
7058            s.advance().ok();
7059        }
7060
7061        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
7062        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7063        assert_eq!(s.poll_server(), Err(Error::Done));
7064
7065        // Read the available body data.
7066        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
7067
7068        // Send the remaining DATA payload.
7069        assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
7070        s.advance().ok();
7071
7072        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7073        assert_eq!(s.poll_server(), Err(Error::Done));
7074
7075        // Read the rest of the body data.
7076        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
7077        assert_eq!(s.poll_server(), Err(Error::Done));
7078
7079        // Send more data.
7080        let r1_body = s.send_body_client(r1_id, false).unwrap();
7081
7082        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7083        assert_eq!(s.poll_server(), Err(Error::Done));
7084
7085        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
7086
7087        // Send a new request to ensure cross-stream events don't break rearming.
7088        let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
7089        let r2_ev_headers = Event::Headers {
7090            list: r2_hdrs,
7091            more_frames: true,
7092        };
7093        let r2_body = s.send_body_client(r2_id, false).unwrap();
7094
7095        s.advance().ok();
7096
7097        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
7098        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
7099        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
7100        assert_eq!(s.poll_server(), Err(Error::Done));
7101
7102        // Send more data on request 1, then trailing HEADERS.
7103        let r1_body = s.send_body_client(r1_id, false).unwrap();
7104
7105        let trailers = vec![Header::new(b"hello", b"world")];
7106
7107        s.client
7108            .send_headers(&mut s.pipe.client, r1_id, &trailers, true)
7109            .unwrap();
7110
7111        let r1_ev_trailers = Event::Headers {
7112            list: trailers.clone(),
7113            more_frames: false,
7114        };
7115
7116        s.advance().ok();
7117
7118        assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
7119        assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
7120
7121        assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
7122        assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
7123        assert_eq!(s.poll_server(), Err(Error::Done));
7124
7125        // Send more data on request 2, then trailing HEADERS.
7126        let r2_body = s.send_body_client(r2_id, false).unwrap();
7127
7128        s.client
7129            .send_headers(&mut s.pipe.client, r2_id, &trailers, false)
7130            .unwrap();
7131
7132        let r2_ev_trailers = Event::Headers {
7133            list: trailers,
7134            more_frames: true,
7135        };
7136
7137        s.advance().ok();
7138
7139        assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
7140        assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
7141        assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
7142        assert_eq!(s.poll_server(), Err(Error::Done));
7143
7144        let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
7145
7146        let r3_ev_headers = Event::Headers {
7147            list: r3_hdrs,
7148            more_frames: true,
7149        };
7150
7151        // Manually send an incomplete DATA frame (i.e. only the header is sent).
7152        {
7153            let mut d = [42; 10];
7154            let mut b = octets::OctetsMut::with_slice(&mut d);
7155
7156            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7157            b.put_varint(bytes.len() as u64).unwrap();
7158            let off = b.off();
7159            s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
7160
7161            s.advance().ok();
7162        }
7163
7164        assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
7165        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7166        assert_eq!(s.poll_server(), Err(Error::Done));
7167
7168        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
7169
7170        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
7171
7172        s.advance().ok();
7173
7174        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7175        assert_eq!(s.poll_server(), Err(Error::Done));
7176
7177        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7178
7179        assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
7180        s.advance().ok();
7181
7182        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7183        assert_eq!(s.poll_server(), Err(Error::Done));
7184
7185        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
7186
7187        // Buffer multiple data frames.
7188        let body = s.send_body_client(r3_id, false).unwrap();
7189        s.send_body_client(r3_id, false).unwrap();
7190        s.send_body_client(r3_id, false).unwrap();
7191
7192        assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
7193        assert_eq!(s.poll_server(), Err(Error::Done));
7194
7195        {
7196            let mut d = [42; 10];
7197            let mut b = octets::OctetsMut::with_slice(&mut d);
7198
7199            b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
7200            b.put_varint(0).unwrap();
7201            let off = b.off();
7202            s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
7203
7204            s.advance().ok();
7205        }
7206
7207        let mut recv_buf = vec![0; bytes.len() * 3];
7208
7209        assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
7210    }
7211
7212    #[test]
7213    /// Tests that the Datagram event is properly re-armed.
7214    fn dgram_event_rearm() {
7215        let mut buf = [0; 65535];
7216
7217        let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
7218        config
7219            .load_cert_chain_from_pem_file("examples/cert.crt")
7220            .unwrap();
7221        config
7222            .load_priv_key_from_pem_file("examples/cert.key")
7223            .unwrap();
7224        config.set_application_protos(&[b"h3"]).unwrap();
7225        config.set_initial_max_data(1500);
7226        config.set_initial_max_stream_data_bidi_local(150);
7227        config.set_initial_max_stream_data_bidi_remote(150);
7228        config.set_initial_max_stream_data_uni(150);
7229        config.set_initial_max_streams_bidi(100);
7230        config.set_initial_max_streams_uni(5);
7231        config.verify_peer(false);
7232        config.enable_dgram(true, 100, 100);
7233
7234        let h3_config = Config::new().unwrap();
7235        let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
7236        s.handshake().unwrap();
7237
7238        // 10 bytes on flow ID 0 and 2.
7239        let flow_0_result = (11, 0, 1);
7240        let flow_2_result = (11, 2, 1);
7241
7242        // Send requests followed by DATAGRAMs on client side.
7243        let (stream, req) = s.send_request(false).unwrap();
7244
7245        let body = s.send_body_client(stream, true).unwrap();
7246
7247        let mut recv_buf = vec![0; body.len()];
7248
7249        let ev_headers = Event::Headers {
7250            list: req,
7251            more_frames: true,
7252        };
7253
7254        s.send_dgram_client(0).unwrap();
7255        s.send_dgram_client(0).unwrap();
7256        s.send_dgram_client(2).unwrap();
7257        s.send_dgram_client(2).unwrap();
7258
7259        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7260        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7261
7262        assert_eq!(s.poll_server(), Err(Error::Done));
7263        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7264
7265        assert_eq!(s.poll_server(), Err(Error::Done));
7266        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7267
7268        assert_eq!(s.poll_server(), Err(Error::Done));
7269        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7270
7271        assert_eq!(s.poll_server(), Err(Error::Done));
7272        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7273
7274        assert_eq!(s.poll_server(), Err(Error::Done));
7275
7276        s.send_dgram_client(0).unwrap();
7277        s.send_dgram_client(2).unwrap();
7278
7279        assert_eq!(s.poll_server(), Err(Error::Done));
7280
7281        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
7282        assert_eq!(s.poll_server(), Err(Error::Done));
7283
7284        assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
7285        assert_eq!(s.poll_server(), Err(Error::Done));
7286
7287        assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
7288        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7289
7290        // Verify that dgram counts are incremented.
7291        assert_eq!(s.pipe.client.dgram_sent_count, 6);
7292        assert_eq!(s.pipe.client.dgram_recv_count, 0);
7293        assert_eq!(s.pipe.server.dgram_sent_count, 0);
7294        assert_eq!(s.pipe.server.dgram_recv_count, 6);
7295
7296        let server_path = s.pipe.server.paths.get_active().expect("no active");
7297        let client_path = s.pipe.client.paths.get_active().expect("no active");
7298        assert_eq!(client_path.dgram_sent_count, 6);
7299        assert_eq!(client_path.dgram_recv_count, 0);
7300        assert_eq!(server_path.dgram_sent_count, 0);
7301        assert_eq!(server_path.dgram_recv_count, 6);
7302    }
7303
7304    #[test]
7305    fn reset_stream() {
7306        let mut buf = [0; 65535];
7307
7308        let mut s = Session::new().unwrap();
7309        s.handshake().unwrap();
7310
7311        // Client sends request.
7312        let (stream, req) = s.send_request(false).unwrap();
7313
7314        let ev_headers = Event::Headers {
7315            list: req,
7316            more_frames: true,
7317        };
7318
7319        // Server sends response and closes stream.
7320        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7321        assert_eq!(s.poll_server(), Err(Error::Done));
7322
7323        let resp = s.send_response(stream, true).unwrap();
7324
7325        let ev_headers = Event::Headers {
7326            list: resp,
7327            more_frames: false,
7328        };
7329
7330        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7331        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7332        assert_eq!(s.poll_client(), Err(Error::Done));
7333
7334        // Client sends RESET_STREAM, closing stream.
7335        let frames = [crate::frame::Frame::ResetStream {
7336            stream_id: stream,
7337            error_code: 42,
7338            final_size: 68,
7339        }];
7340
7341        let pkt_type = crate::packet::Type::Short;
7342        assert_eq!(
7343            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7344            Ok(39)
7345        );
7346
7347        // Server issues Reset event for the stream.
7348        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7349        assert_eq!(s.poll_server(), Err(Error::Done));
7350
7351        // Sending RESET_STREAM again shouldn't trigger another Reset event.
7352        assert_eq!(
7353            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7354            Ok(39)
7355        );
7356
7357        assert_eq!(s.poll_server(), Err(Error::Done));
7358    }
7359
7360    /// The client shuts down the stream's write direction, the server
7361    /// shuts down its side with fin
7362    #[test]
7363    fn client_shutdown_write_server_fin() {
7364        let mut buf = [0; 65535];
7365        let mut s = Session::new().unwrap();
7366        s.handshake().unwrap();
7367
7368        // Client sends request.
7369        let (stream, req) = s.send_request(false).unwrap();
7370
7371        let ev_headers = Event::Headers {
7372            list: req,
7373            more_frames: true,
7374        };
7375
7376        // Server sends response and closes stream.
7377        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7378        assert_eq!(s.poll_server(), Err(Error::Done));
7379
7380        let resp = s.send_response(stream, true).unwrap();
7381
7382        let ev_headers = Event::Headers {
7383            list: resp,
7384            more_frames: false,
7385        };
7386
7387        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7388        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7389        assert_eq!(s.poll_client(), Err(Error::Done));
7390
7391        // Client shuts down stream ==> sends RESET_STREAM
7392        assert_eq!(
7393            s.pipe
7394                .client
7395                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7396            Ok(())
7397        );
7398        assert_eq!(s.advance(), Ok(()));
7399
7400        // Server sees the Reset event for the stream.
7401        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7402        assert_eq!(s.poll_server(), Err(Error::Done));
7403
7404        // Streams have been collected by quiche
7405        assert!(s.pipe.server.streams.is_collected(stream));
7406        assert!(s.pipe.client.streams.is_collected(stream));
7407
7408        // Client sends another request, server sends response without fin
7409        //
7410        let (stream, req) = s.send_request(false).unwrap();
7411
7412        let ev_headers = Event::Headers {
7413            list: req,
7414            more_frames: true,
7415        };
7416
7417        // Check that server has received the request.
7418        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7419        assert_eq!(s.poll_server(), Err(Error::Done));
7420
7421        // Server sends reponse without closing the stream.
7422        let resp = s.send_response(stream, false).unwrap();
7423
7424        let ev_headers = Event::Headers {
7425            list: resp,
7426            more_frames: true,
7427        };
7428
7429        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7430        assert_eq!(s.poll_client(), Err(Error::Done));
7431
7432        // Client shuts down stream ==> sends RESET_STREAM
7433        assert_eq!(
7434            s.pipe
7435                .client
7436                .stream_shutdown(stream, crate::Shutdown::Write, 42),
7437            Ok(())
7438        );
7439        assert_eq!(s.advance(), Ok(()));
7440
7441        // Server sees the Reset event for the stream.
7442        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
7443        assert_eq!(s.poll_server(), Err(Error::Done));
7444
7445        // Server sends body and closes the stream.
7446        s.send_body_server(stream, true).unwrap();
7447
7448        // Stream has been collected on server by quiche
7449        assert!(s.pipe.server.streams.is_collected(stream));
7450        // Client stream has not been collected, the client needs to
7451        // read the fin from the stream first.
7452        assert!(!s.pipe.client.streams.is_collected(stream));
7453        assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
7454        s.recv_body_client(stream, &mut buf).unwrap();
7455        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7456        assert_eq!(s.poll_client(), Err(Error::Done));
7457        assert!(s.pipe.client.streams.is_collected(stream));
7458    }
7459
7460    #[test]
7461    fn client_shutdown_read() {
7462        let mut buf = [0; 65535];
7463        let mut s = Session::new().unwrap();
7464        s.handshake().unwrap();
7465
7466        // Client sends request and leaves stream open.
7467        let (stream, req) = s.send_request(false).unwrap();
7468
7469        let ev_headers = Event::Headers {
7470            list: req,
7471            more_frames: true,
7472        };
7473
7474        // Server sends response and leaves stream open.
7475        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7476        assert_eq!(s.poll_server(), Err(Error::Done));
7477
7478        let resp = s.send_response(stream, false).unwrap();
7479
7480        let ev_headers = Event::Headers {
7481            list: resp,
7482            more_frames: true,
7483        };
7484
7485        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7486        assert_eq!(s.poll_client(), Err(Error::Done));
7487        // Client shuts down read
7488        assert_eq!(
7489            s.pipe
7490                .client
7491                .stream_shutdown(stream, crate::Shutdown::Read, 42),
7492            Ok(())
7493        );
7494        assert_eq!(s.advance(), Ok(()));
7495
7496        // Stream is writable on server side, but returns StreamStopped
7497        assert_eq!(s.poll_server(), Err(Error::Done));
7498        let writables: Vec<u64> = s.pipe.server.writable().collect();
7499        assert!(writables.contains(&stream));
7500        assert_eq!(
7501            s.send_body_server(stream, false),
7502            Err(Error::TransportError(crate::Error::StreamStopped(42)))
7503        );
7504
7505        // Client needs to finish its side by sending a fin
7506        assert_eq!(
7507            s.client.send_body(&mut s.pipe.client, stream, &[], true),
7508            Ok(0)
7509        );
7510        assert_eq!(s.advance(), Ok(()));
7511        // Note, we get an Event::Data for an empty buffer today. But it
7512        // would also be fine to not get it.
7513        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7514        assert_eq!(s.recv_body_server(stream, &mut buf), Err(Error::Done));
7515        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7516        assert_eq!(s.poll_server(), Err(Error::Done));
7517
7518        // Since the client has already send a fin, the stream is collected
7519        // on both client and server
7520        assert!(s.pipe.client.streams.is_collected(stream));
7521        assert!(s.pipe.server.streams.is_collected(stream));
7522    }
7523
7524    #[test]
7525    fn reset_finished_at_server() {
7526        let mut s = Session::new().unwrap();
7527        s.handshake().unwrap();
7528
7529        // Client sends HEADERS and doesn't fin
7530        let (stream, _req) = s.send_request(false).unwrap();
7531
7532        // ..then Client sends RESET_STREAM
7533        assert_eq!(
7534            s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
7535            Ok(())
7536        );
7537
7538        assert_eq!(s.pipe.advance(), Ok(()));
7539
7540        // Server receives just a reset
7541        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7542        assert_eq!(s.poll_server(), Err(Error::Done));
7543
7544        // Client sends HEADERS and fin
7545        let (stream, req) = s.send_request(true).unwrap();
7546
7547        // ..then Client sends RESET_STREAM
7548        assert_eq!(
7549            s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
7550            Ok(())
7551        );
7552
7553        let ev_headers = Event::Headers {
7554            list: req,
7555            more_frames: false,
7556        };
7557
7558        // Server receives headers and fin.
7559        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7560        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7561        assert_eq!(s.poll_server(), Err(Error::Done));
7562    }
7563
7564    #[test]
7565    fn reset_finished_at_server_with_data_pending() {
7566        let mut s = Session::new().unwrap();
7567        s.handshake().unwrap();
7568
7569        // Client sends HEADERS and doesn't fin.
7570        let (stream, req) = s.send_request(false).unwrap();
7571
7572        assert!(s.send_body_client(stream, false).is_ok());
7573
7574        assert_eq!(s.pipe.advance(), Ok(()));
7575
7576        let ev_headers = Event::Headers {
7577            list: req,
7578            more_frames: true,
7579        };
7580
7581        // Server receives headers and data...
7582        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7583        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7584
7585        // ..then Client sends RESET_STREAM.
7586        assert_eq!(
7587            s.pipe
7588                .client
7589                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7590            Ok(())
7591        );
7592
7593        assert_eq!(s.pipe.advance(), Ok(()));
7594
7595        // The server does *not* attempt to read from the stream,
7596        // but polls and receives the reset and there are no more
7597        // readable streams.
7598        assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
7599        assert_eq!(s.poll_server(), Err(Error::Done));
7600        assert_eq!(s.pipe.server.readable().len(), 0);
7601    }
7602
7603    #[test]
7604    fn reset_finished_at_server_with_data_pending_2() {
7605        let mut s = Session::new().unwrap();
7606        s.handshake().unwrap();
7607
7608        // Client sends HEADERS and doesn't fin.
7609        let (stream, req) = s.send_request(false).unwrap();
7610
7611        assert!(s.send_body_client(stream, false).is_ok());
7612
7613        assert_eq!(s.pipe.advance(), Ok(()));
7614
7615        let ev_headers = Event::Headers {
7616            list: req,
7617            more_frames: true,
7618        };
7619
7620        // Server receives headers and data...
7621        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7622        assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
7623
7624        // ..then Client sends RESET_STREAM.
7625        assert_eq!(
7626            s.pipe
7627                .client
7628                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7629            Ok(())
7630        );
7631
7632        assert_eq!(s.pipe.advance(), Ok(()));
7633
7634        // Server reads from the stream and receives the reset while
7635        // attempting to read.
7636        assert_eq!(
7637            s.recv_body_server(stream, &mut [0; 100]),
7638            Err(Error::TransportError(crate::Error::StreamReset(0)))
7639        );
7640
7641        // No more events and there are no more readable streams.
7642        assert_eq!(s.poll_server(), Err(Error::Done));
7643        assert_eq!(s.pipe.server.readable().len(), 0);
7644    }
7645
7646    #[test]
7647    fn reset_finished_at_client() {
7648        let mut buf = [0; 65535];
7649        let mut s = Session::new().unwrap();
7650        s.handshake().unwrap();
7651
7652        // Client sends HEADERS and doesn't fin
7653        let (stream, req) = s.send_request(false).unwrap();
7654
7655        let ev_headers = Event::Headers {
7656            list: req,
7657            more_frames: true,
7658        };
7659
7660        // Server receives headers.
7661        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7662        assert_eq!(s.poll_server(), Err(Error::Done));
7663
7664        // Server sends response and doesn't fin
7665        s.send_response(stream, false).unwrap();
7666
7667        assert_eq!(s.pipe.advance(), Ok(()));
7668
7669        // .. then Server sends RESET_STREAM
7670        assert_eq!(
7671            s.pipe
7672                .server
7673                .stream_shutdown(stream, crate::Shutdown::Write, 0),
7674            Ok(())
7675        );
7676
7677        assert_eq!(s.pipe.advance(), Ok(()));
7678
7679        // Client receives Reset only
7680        assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
7681        assert_eq!(s.poll_server(), Err(Error::Done));
7682
7683        // Client sends headers and fin.
7684        let (stream, req) = s.send_request(true).unwrap();
7685
7686        let ev_headers = Event::Headers {
7687            list: req,
7688            more_frames: false,
7689        };
7690
7691        // Server receives headers and fin.
7692        assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
7693        assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
7694        assert_eq!(s.poll_server(), Err(Error::Done));
7695
7696        // Server sends response and fin
7697        let resp = s.send_response(stream, true).unwrap();
7698
7699        assert_eq!(s.pipe.advance(), Ok(()));
7700
7701        // ..then Server sends RESET_STREAM
7702        let frames = [crate::frame::Frame::ResetStream {
7703            stream_id: stream,
7704            error_code: 42,
7705            final_size: 68,
7706        }];
7707
7708        let pkt_type = crate::packet::Type::Short;
7709        assert_eq!(
7710            s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
7711            Ok(39)
7712        );
7713
7714        assert_eq!(s.pipe.advance(), Ok(()));
7715
7716        let ev_headers = Event::Headers {
7717            list: resp,
7718            more_frames: false,
7719        };
7720
7721        // Client receives headers and fin.
7722        assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
7723        assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
7724        assert_eq!(s.poll_client(), Err(Error::Done));
7725    }
7726}
7727
7728#[cfg(feature = "ffi")]
7729mod ffi;
7730#[cfg(feature = "internal")]
7731#[doc(hidden)]
7732pub mod frame;
7733#[cfg(not(feature = "internal"))]
7734mod frame;
7735#[doc(hidden)]
7736pub mod qpack;
7737mod stream;