use std::collections::HashSet;
use std::collections::VecDeque;
#[cfg(feature = "sfv")]
use std::convert::TryFrom;
use std::fmt;
use std::fmt::Write;
#[cfg(feature = "qlog")]
use qlog::events::http3::FrameCreated;
#[cfg(feature = "qlog")]
use qlog::events::http3::FrameParsed;
#[cfg(feature = "qlog")]
use qlog::events::http3::Http3EventType;
#[cfg(feature = "qlog")]
use qlog::events::http3::Http3Frame;
#[cfg(feature = "qlog")]
use qlog::events::http3::Initiator;
#[cfg(feature = "qlog")]
use qlog::events::http3::StreamType;
#[cfg(feature = "qlog")]
use qlog::events::http3::StreamTypeSet;
#[cfg(feature = "qlog")]
use qlog::events::EventData;
#[cfg(feature = "qlog")]
use qlog::events::EventImportance;
#[cfg(feature = "qlog")]
use qlog::events::EventType;
use crate::buffers::BufFactory;
use crate::BufSplit;
pub const APPLICATION_PROTOCOL: &[&[u8]] = &[b"h3"];
const PRIORITY_URGENCY_OFFSET: u8 = 124;
const PRIORITY_URGENCY_LOWER_BOUND: u8 = 0;
const PRIORITY_URGENCY_UPPER_BOUND: u8 = 7;
const PRIORITY_URGENCY_DEFAULT: u8 = 3;
const PRIORITY_INCREMENTAL_DEFAULT: bool = false;
#[cfg(feature = "qlog")]
const QLOG_FRAME_CREATED: EventType =
EventType::Http3EventType(Http3EventType::FrameCreated);
#[cfg(feature = "qlog")]
const QLOG_FRAME_PARSED: EventType =
EventType::Http3EventType(Http3EventType::FrameParsed);
#[cfg(feature = "qlog")]
const QLOG_STREAM_TYPE_SET: EventType =
EventType::Http3EventType(Http3EventType::StreamTypeSet);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Error {
Done,
BufferTooShort,
InternalError,
ExcessiveLoad,
IdError,
StreamCreationError,
ClosedCriticalStream,
MissingSettings,
FrameUnexpected,
FrameError,
QpackDecompressionFailed,
TransportError(crate::Error),
StreamBlocked,
SettingsError,
RequestRejected,
RequestCancelled,
RequestIncomplete,
MessageError,
ConnectError,
VersionFallback,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum WireErrorCode {
NoError = 0x100,
GeneralProtocolError = 0x101,
InternalError = 0x102,
StreamCreationError = 0x103,
ClosedCriticalStream = 0x104,
FrameUnexpected = 0x105,
FrameError = 0x106,
ExcessiveLoad = 0x107,
IdError = 0x108,
SettingsError = 0x109,
MissingSettings = 0x10a,
RequestRejected = 0x10b,
RequestCancelled = 0x10c,
RequestIncomplete = 0x10d,
MessageError = 0x10e,
ConnectError = 0x10f,
VersionFallback = 0x110,
}
impl Error {
fn to_wire(self) -> u64 {
match self {
Error::Done => WireErrorCode::NoError as u64,
Error::InternalError => WireErrorCode::InternalError as u64,
Error::StreamCreationError =>
WireErrorCode::StreamCreationError as u64,
Error::ClosedCriticalStream =>
WireErrorCode::ClosedCriticalStream as u64,
Error::FrameUnexpected => WireErrorCode::FrameUnexpected as u64,
Error::FrameError => WireErrorCode::FrameError as u64,
Error::ExcessiveLoad => WireErrorCode::ExcessiveLoad as u64,
Error::IdError => WireErrorCode::IdError as u64,
Error::MissingSettings => WireErrorCode::MissingSettings as u64,
Error::QpackDecompressionFailed => 0x200,
Error::BufferTooShort => 0x999,
Error::TransportError { .. } | Error::StreamBlocked => 0xFF,
Error::SettingsError => WireErrorCode::SettingsError as u64,
Error::RequestRejected => WireErrorCode::RequestRejected as u64,
Error::RequestCancelled => WireErrorCode::RequestCancelled as u64,
Error::RequestIncomplete => WireErrorCode::RequestIncomplete as u64,
Error::MessageError => WireErrorCode::MessageError as u64,
Error::ConnectError => WireErrorCode::ConnectError as u64,
Error::VersionFallback => WireErrorCode::VersionFallback as u64,
}
}
#[cfg(feature = "ffi")]
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
Error::BufferTooShort => -2,
Error::InternalError => -3,
Error::ExcessiveLoad => -4,
Error::IdError => -5,
Error::StreamCreationError => -6,
Error::ClosedCriticalStream => -7,
Error::MissingSettings => -8,
Error::FrameUnexpected => -9,
Error::FrameError => -10,
Error::QpackDecompressionFailed => -11,
Error::StreamBlocked => -13,
Error::SettingsError => -14,
Error::RequestRejected => -15,
Error::RequestCancelled => -16,
Error::RequestIncomplete => -17,
Error::MessageError => -18,
Error::ConnectError => -19,
Error::VersionFallback => -20,
Error::TransportError(quic_error) => quic_error.to_c() - 1000,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl From<super::Error> for Error {
fn from(err: super::Error) -> Self {
match err {
super::Error::Done => Error::Done,
_ => Error::TransportError(err),
}
}
}
impl From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
pub struct Config {
max_field_section_size: Option<u64>,
qpack_max_table_capacity: Option<u64>,
qpack_blocked_streams: Option<u64>,
connect_protocol_enabled: Option<u64>,
additional_settings: Option<Vec<(u64, u64)>>,
}
impl Config {
pub const fn new() -> Result<Config> {
Ok(Config {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
additional_settings: None,
})
}
pub fn set_max_field_section_size(&mut self, v: u64) {
self.max_field_section_size = Some(v);
}
pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
self.qpack_max_table_capacity = Some(v);
}
pub fn set_qpack_blocked_streams(&mut self, v: u64) {
self.qpack_blocked_streams = Some(v);
}
pub fn enable_extended_connect(&mut self, enabled: bool) {
if enabled {
self.connect_protocol_enabled = Some(1);
} else {
self.connect_protocol_enabled = None;
}
}
pub fn set_additional_settings(
&mut self, additional_settings: Vec<(u64, u64)>,
) -> Result<()> {
let explicit_quiche_settings = HashSet::from([
frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
frame::SETTINGS_QPACK_BLOCKED_STREAMS,
frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
frame::SETTINGS_H3_DATAGRAM,
frame::SETTINGS_H3_DATAGRAM_00,
]);
let dedup_settings: HashSet<u64> =
additional_settings.iter().map(|(key, _)| *key).collect();
if dedup_settings.len() != additional_settings.len() ||
!explicit_quiche_settings.is_disjoint(&dedup_settings)
{
return Err(Error::SettingsError);
}
self.additional_settings = Some(additional_settings);
Ok(())
}
}
pub trait NameValue {
fn name(&self) -> &[u8];
fn value(&self) -> &[u8];
}
impl<N, V> NameValue for (N, V)
where
N: AsRef<[u8]>,
V: AsRef<[u8]>,
{
fn name(&self) -> &[u8] {
self.0.as_ref()
}
fn value(&self) -> &[u8] {
self.1.as_ref()
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct Header(Vec<u8>, Vec<u8>);
fn try_print_as_readable(hdr: &[u8], f: &mut fmt::Formatter) -> fmt::Result {
match std::str::from_utf8(hdr) {
Ok(s) => f.write_str(&s.escape_default().to_string()),
Err(_) => write!(f, "{hdr:?}"),
}
}
impl fmt::Debug for Header {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_char('"')?;
try_print_as_readable(&self.0, f)?;
f.write_str(": ")?;
try_print_as_readable(&self.1, f)?;
f.write_char('"')
}
}
impl Header {
pub fn new(name: &[u8], value: &[u8]) -> Self {
Self(name.to_vec(), value.to_vec())
}
}
impl NameValue for Header {
fn name(&self) -> &[u8] {
&self.0
}
fn value(&self) -> &[u8] {
&self.1
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HeaderRef<'a>(&'a [u8], &'a [u8]);
impl<'a> HeaderRef<'a> {
pub const fn new(name: &'a [u8], value: &'a [u8]) -> Self {
Self(name, value)
}
}
impl NameValue for HeaderRef<'_> {
fn name(&self) -> &[u8] {
self.0
}
fn value(&self) -> &[u8] {
self.1
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Event {
Headers {
list: Vec<Header>,
more_frames: bool,
},
Data,
Finished,
Reset(u64),
PriorityUpdate,
GoAway,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(C)]
pub struct Priority {
urgency: u8,
incremental: bool,
}
impl Default for Priority {
fn default() -> Self {
Priority {
urgency: PRIORITY_URGENCY_DEFAULT,
incremental: PRIORITY_INCREMENTAL_DEFAULT,
}
}
}
impl Priority {
pub const fn new(urgency: u8, incremental: bool) -> Self {
Priority {
urgency,
incremental,
}
}
}
#[cfg(feature = "sfv")]
#[cfg_attr(docsrs, doc(cfg(feature = "sfv")))]
impl TryFrom<&[u8]> for Priority {
type Error = Error;
fn try_from(value: &[u8]) -> std::result::Result<Self, Self::Error> {
let dict = match sfv::Parser::parse_dictionary(value) {
Ok(v) => v,
Err(_) => return Err(Error::Done),
};
let urgency = match dict.get("u") {
Some(sfv::ListEntry::Item(item)) => match item.bare_item.as_int() {
Some(v) => {
if !(PRIORITY_URGENCY_LOWER_BOUND as i64..=
PRIORITY_URGENCY_UPPER_BOUND as i64)
.contains(&v)
{
PRIORITY_URGENCY_UPPER_BOUND
} else {
v as u8
}
},
None => return Err(Error::Done),
},
Some(sfv::ListEntry::InnerList(_)) => return Err(Error::Done),
None => PRIORITY_URGENCY_DEFAULT,
};
let incremental = match dict.get("i") {
Some(sfv::ListEntry::Item(item)) =>
item.bare_item.as_bool().ok_or(Error::Done)?,
_ => false,
};
Ok(Priority::new(urgency, incremental))
}
}
struct ConnectionSettings {
pub max_field_section_size: Option<u64>,
pub qpack_max_table_capacity: Option<u64>,
pub qpack_blocked_streams: Option<u64>,
pub connect_protocol_enabled: Option<u64>,
pub h3_datagram: Option<u64>,
pub additional_settings: Option<Vec<(u64, u64)>>,
pub raw: Option<Vec<(u64, u64)>>,
}
#[derive(Default)]
struct QpackStreams {
pub encoder_stream_id: Option<u64>,
pub encoder_stream_bytes: u64,
pub decoder_stream_id: Option<u64>,
pub decoder_stream_bytes: u64,
}
#[derive(Clone, Default)]
pub struct Stats {
pub qpack_encoder_stream_recv_bytes: u64,
pub qpack_decoder_stream_recv_bytes: u64,
}
fn close_conn_critical_stream<F: BufFactory>(
conn: &mut super::Connection<F>,
) -> Result<()> {
conn.close(
true,
Error::ClosedCriticalStream.to_wire(),
b"Critical stream closed.",
)?;
Err(Error::ClosedCriticalStream)
}
fn close_conn_if_critical_stream_finished<F: BufFactory>(
conn: &mut super::Connection<F>, stream_id: u64,
) -> Result<()> {
if conn.stream_finished(stream_id) {
close_conn_critical_stream(conn)?;
}
Ok(())
}
pub struct Connection {
is_server: bool,
next_request_stream_id: u64,
next_uni_stream_id: u64,
streams: crate::stream::StreamIdHashMap<stream::Stream>,
local_settings: ConnectionSettings,
peer_settings: ConnectionSettings,
control_stream_id: Option<u64>,
peer_control_stream_id: Option<u64>,
qpack_encoder: qpack::Encoder,
qpack_decoder: qpack::Decoder,
local_qpack_streams: QpackStreams,
peer_qpack_streams: QpackStreams,
max_push_id: u64,
finished_streams: VecDeque<u64>,
frames_greased: bool,
local_goaway_id: Option<u64>,
peer_goaway_id: Option<u64>,
}
impl Connection {
fn new(
config: &Config, is_server: bool, enable_dgram: bool,
) -> Result<Connection> {
let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
let h3_datagram = if enable_dgram { Some(1) } else { None };
Ok(Connection {
is_server,
next_request_stream_id: 0,
next_uni_stream_id: initial_uni_stream_id,
streams: Default::default(),
local_settings: ConnectionSettings {
max_field_section_size: config.max_field_section_size,
qpack_max_table_capacity: config.qpack_max_table_capacity,
qpack_blocked_streams: config.qpack_blocked_streams,
connect_protocol_enabled: config.connect_protocol_enabled,
h3_datagram,
additional_settings: config.additional_settings.clone(),
raw: Default::default(),
},
peer_settings: ConnectionSettings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
h3_datagram: None,
connect_protocol_enabled: None,
additional_settings: Default::default(),
raw: Default::default(),
},
control_stream_id: None,
peer_control_stream_id: None,
qpack_encoder: qpack::Encoder::new(),
qpack_decoder: qpack::Decoder::new(),
local_qpack_streams: Default::default(),
peer_qpack_streams: Default::default(),
max_push_id: 0,
finished_streams: VecDeque::new(),
frames_greased: false,
local_goaway_id: None,
peer_goaway_id: None,
})
}
pub fn with_transport<F: BufFactory>(
conn: &mut super::Connection<F>, config: &Config,
) -> Result<Connection> {
let is_client = !conn.is_server;
if is_client && !(conn.is_established() || conn.is_in_early_data()) {
trace!("{} QUIC connection must be established or in early data before creating an HTTP/3 connection", conn.trace_id());
return Err(Error::InternalError);
}
let mut http3_conn =
Connection::new(config, conn.is_server, conn.dgram_enabled())?;
match http3_conn.send_settings(conn) {
Ok(_) => (),
Err(e) => {
conn.close(true, e.to_wire(), b"Error opening control stream")?;
return Err(e);
},
};
http3_conn.open_qpack_encoder_stream(conn).ok();
http3_conn.open_qpack_decoder_stream(conn).ok();
if conn.grease {
http3_conn.open_grease_stream(conn).ok();
}
Ok(http3_conn)
}
pub fn send_request<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, headers: &[T], fin: bool,
) -> Result<u64> {
if self.peer_goaway_id.is_some() {
return Err(Error::FrameUnexpected);
}
let stream_id = self.next_request_stream_id;
self.streams
.insert(stream_id, <stream::Stream>::new(stream_id, true));
if let Err(e) = conn.stream_send(stream_id, b"", false) {
self.streams.remove(&stream_id);
if e == super::Error::Done {
return Err(Error::StreamBlocked);
}
return Err(e.into());
};
self.send_headers(conn, stream_id, headers, fin)?;
self.next_request_stream_id = self
.next_request_stream_id
.checked_add(4)
.ok_or(Error::IdError)?;
Ok(stream_id)
}
pub fn send_response<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
headers: &[T], fin: bool,
) -> Result<()> {
let priority = Default::default();
self.send_response_with_priority(
conn, stream_id, headers, &priority, fin,
)?;
Ok(())
}
pub fn send_response_with_priority<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
headers: &[T], priority: &Priority, fin: bool,
) -> Result<()> {
match self.streams.get(&stream_id) {
Some(s) => {
if s.local_initialized() {
return Err(Error::FrameUnexpected);
}
s
},
None => return Err(Error::FrameUnexpected),
};
self.send_headers(conn, stream_id, headers, fin)?;
let urgency = priority
.urgency
.clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
PRIORITY_URGENCY_OFFSET;
conn.stream_priority(stream_id, urgency, priority.incremental)?;
Ok(())
}
pub fn send_additional_headers<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
headers: &[T], is_trailer_section: bool, fin: bool,
) -> Result<()> {
if !self.is_server && !is_trailer_section {
return Err(Error::FrameUnexpected);
}
match self.streams.get(&stream_id) {
Some(s) => {
if !s.local_initialized() {
return Err(Error::FrameUnexpected);
}
if s.trailers_sent() {
return Err(Error::FrameUnexpected);
}
s
},
None => return Err(Error::FrameUnexpected),
};
self.send_headers(conn, stream_id, headers, fin)?;
if is_trailer_section {
if let Some(s) = self.streams.get_mut(&stream_id) {
s.mark_trailers_sent();
}
}
Ok(())
}
pub fn send_additional_headers_with_priority<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
headers: &[T], priority: &Priority, is_trailer_section: bool, fin: bool,
) -> Result<()> {
self.send_additional_headers(
conn,
stream_id,
headers,
is_trailer_section,
fin,
)?;
let urgency = priority
.urgency
.clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND) +
PRIORITY_URGENCY_OFFSET;
conn.stream_priority(stream_id, urgency, priority.incremental)?;
Ok(())
}
fn encode_header_block<T: NameValue>(
&mut self, headers: &[T],
) -> Result<Vec<u8>> {
let headers_len = headers
.iter()
.fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
let mut header_block = vec![0; headers_len];
let len = self
.qpack_encoder
.encode(headers, &mut header_block)
.map_err(|_| Error::InternalError)?;
header_block.truncate(len);
Ok(header_block)
}
fn send_headers<T: NameValue, F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
headers: &[T], fin: bool,
) -> Result<()> {
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
if !self.frames_greased && conn.grease {
self.send_grease_frames(conn, stream_id)?;
self.frames_greased = true;
}
let header_block = self.encode_header_block(headers)?;
let overhead = octets::varint_len(frame::HEADERS_FRAME_TYPE_ID) +
octets::varint_len(header_block.len() as u64);
match conn.stream_writable(stream_id, overhead + header_block.len()) {
Ok(true) => (),
Ok(false) => return Err(Error::StreamBlocked),
Err(e) => {
if conn.stream_finished(stream_id) {
self.streams.remove(&stream_id);
}
return Err(e.into());
},
};
b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?;
b.put_varint(header_block.len() as u64)?;
let off = b.off();
conn.stream_send(stream_id, &d[..off], false)?;
conn.stream_send(stream_id, &header_block, fin)?;
trace!(
"{} tx frm HEADERS stream={} len={} fin={}",
conn.trace_id(),
stream_id,
header_block.len(),
fin
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let qlog_headers = headers
.iter()
.map(|h| qlog::events::http3::HttpHeader {
name: Some(String::from_utf8_lossy(h.name()).into_owned()),
name_bytes: None,
value: Some(String::from_utf8_lossy(h.value()).into_owned()),
value_bytes: None,
})
.collect();
let frame = Http3Frame::Headers {
headers: qlog_headers,
raw: None,
};
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(header_block.len() as u64),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
if let Some(s) = self.streams.get_mut(&stream_id) {
s.initialize_local();
}
if fin && conn.stream_finished(stream_id) {
self.streams.remove(&stream_id);
}
Ok(())
}
pub fn send_body<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64, body: &[u8],
fin: bool,
) -> Result<usize> {
self.do_send_body(
conn,
stream_id,
body,
fin,
|conn: &mut super::Connection<F>,
header: &[u8],
stream_id: u64,
body: &[u8],
body_len: usize,
fin: bool| {
conn.stream_send(stream_id, header, false)?;
Ok(conn
.stream_send(stream_id, &body[..body_len], fin)
.map(|v| (v, v))?)
},
)
}
pub fn send_body_zc<F>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
body: &mut F::Buf, fin: bool,
) -> Result<usize>
where
F: BufFactory,
F::Buf: BufSplit,
{
self.do_send_body(
conn,
stream_id,
body,
fin,
|conn: &mut super::Connection<F>,
header: &[u8],
stream_id: u64,
body: &mut F::Buf,
mut body_len: usize,
fin: bool| {
let with_prefix = body.try_add_prefix(header);
if !with_prefix {
conn.stream_send(stream_id, header, false)?;
} else {
body_len += header.len();
}
let (mut n, rem) = conn.stream_send_zc(
stream_id,
body.clone(),
Some(body_len),
fin,
)?;
if with_prefix {
n -= header.len();
}
if let Some(rem) = rem {
let _ = std::mem::replace(body, rem);
}
Ok((n, n))
},
)
}
fn do_send_body<F, B, R, SND>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64, body: B,
fin: bool, write_fn: SND,
) -> Result<R>
where
F: BufFactory,
B: AsRef<[u8]>,
SND: FnOnce(
&mut super::Connection<F>,
&[u8],
u64,
B,
usize,
bool,
) -> Result<(usize, R)>,
{
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
let len = body.as_ref().len();
if stream_id % 4 != 0 {
return Err(Error::FrameUnexpected);
}
match self.streams.get_mut(&stream_id) {
Some(s) => {
if !s.local_initialized() {
return Err(Error::FrameUnexpected);
}
if s.trailers_sent() {
return Err(Error::FrameUnexpected);
}
},
None => {
return Err(Error::FrameUnexpected);
},
};
if len == 0 && !fin {
return Err(Error::Done);
}
let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
octets::varint_len(len as u64);
let stream_cap = match conn.stream_capacity(stream_id) {
Ok(v) => v,
Err(e) => {
if conn.stream_finished(stream_id) {
self.streams.remove(&stream_id);
}
return Err(e.into());
},
};
if stream_cap < overhead {
let _ = conn.stream_writable(stream_id, overhead + 1);
return Err(Error::Done);
}
let body_len = std::cmp::min(len, stream_cap - overhead);
let fin = if body_len != len { false } else { fin };
if body_len == 0 && !fin {
let _ = conn.stream_writable(stream_id, overhead + 1);
return Err(Error::Done);
}
b.put_varint(frame::DATA_FRAME_TYPE_ID)?;
b.put_varint(body_len as u64)?;
let off = b.off();
let (written, ret) =
write_fn(conn, &d[..off], stream_id, body, body_len, fin)?;
trace!(
"{} tx frm DATA stream={} len={} fin={}",
conn.trace_id(),
stream_id,
written,
fin
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let frame = Http3Frame::Data { raw: None };
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(written as u64),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
if written < len {
let _ = conn.stream_writable(stream_id, overhead + 1);
}
if fin && written == len && conn.stream_finished(stream_id) {
self.streams.remove(&stream_id);
}
Ok(ret)
}
pub fn dgram_enabled_by_peer<F: BufFactory>(
&self, conn: &super::Connection<F>,
) -> bool {
self.peer_settings.h3_datagram == Some(1) &&
conn.dgram_max_writable_len().is_some()
}
pub fn extended_connect_enabled_by_peer(&self) -> bool {
self.peer_settings.connect_protocol_enabled == Some(1)
}
pub fn recv_body<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
out: &mut [u8],
) -> Result<usize> {
self.recv_body_buf(conn, stream_id, out)
}
pub fn recv_body_buf<F: BufFactory, OUT: bytes::BufMut>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64, mut out: OUT,
) -> Result<usize> {
let mut total = 0;
while out.has_remaining_mut() {
let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
if stream.state() != stream::State::Data {
break;
}
let (read, fin) = match stream.try_consume_data(conn, &mut out) {
Ok(v) => v,
Err(Error::Done) => break,
Err(e) => return Err(e),
};
total += read;
if read == 0 || fin {
break;
}
match self.process_readable_stream(conn, stream_id, false) {
Ok(_) => unreachable!(),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
if conn.stream_finished(stream_id) {
break;
}
}
if conn.stream_finished(stream_id) {
self.process_finished_stream(stream_id);
}
if total == 0 {
return Err(Error::Done);
}
Ok(total)
}
pub fn send_priority_update_for_request<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
priority: &Priority,
) -> Result<()> {
let mut d = [42; 20];
let mut b = octets::OctetsMut::with_slice(&mut d);
if self.is_server {
return Err(Error::FrameUnexpected);
}
if stream_id % 4 != 0 {
return Err(Error::FrameUnexpected);
}
let control_stream_id =
self.control_stream_id.ok_or(Error::FrameUnexpected)?;
let urgency = priority
.urgency
.clamp(PRIORITY_URGENCY_LOWER_BOUND, PRIORITY_URGENCY_UPPER_BOUND);
let mut field_value = format!("u={urgency}");
if priority.incremental {
field_value.push_str(",i");
}
let priority_field_value = field_value.as_bytes();
let frame_payload_len =
octets::varint_len(stream_id) + priority_field_value.len();
let overhead =
octets::varint_len(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID) +
octets::varint_len(stream_id) +
octets::varint_len(frame_payload_len as u64);
match conn.stream_writable(
control_stream_id,
overhead + priority_field_value.len(),
) {
Ok(true) => (),
Ok(false) => return Err(Error::StreamBlocked),
Err(e) => {
return Err(e.into());
},
}
b.put_varint(frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID)?;
b.put_varint(frame_payload_len as u64)?;
b.put_varint(stream_id)?;
let off = b.off();
conn.stream_send(control_stream_id, &d[..off], false)?;
conn.stream_send(control_stream_id, priority_field_value, false)?;
trace!(
"{} tx frm PRIORITY_UPDATE request_stream={} priority_field_value={}",
conn.trace_id(),
stream_id,
field_value,
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let frame = Http3Frame::PriorityUpdate {
stream_id: Some(stream_id),
push_id: None,
priority_field_value: field_value.clone(),
raw: None,
};
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(priority_field_value.len() as u64),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
Ok(())
}
pub fn take_last_priority_update(
&mut self, prioritized_element_id: u64,
) -> Result<Vec<u8>> {
if let Some(stream) = self.streams.get_mut(&prioritized_element_id) {
return stream.take_last_priority_update().ok_or(Error::Done);
}
Err(Error::Done)
}
pub fn poll<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>,
) -> Result<(u64, Event)> {
if conn.local_error.is_some() {
return Err(Error::Done);
}
if let Some(stream_id) = self.peer_control_stream_id {
match self.process_control_stream(conn, stream_id) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
}
if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
match self.process_control_stream(conn, stream_id) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
}
if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
match self.process_control_stream(conn, stream_id) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
}
if let Some(finished) = self.finished_streams.pop_front() {
return Ok((finished, Event::Finished));
}
for s in conn.readable() {
trace!("{} stream id {} is readable", conn.trace_id(), s);
let ev = match self.process_readable_stream(conn, s, true) {
Ok(v) => Some(v),
Err(Error::Done) => None,
Err(Error::TransportError(crate::Error::StreamReset(e))) =>
return Ok((s, Event::Reset(e))),
Err(e) => return Err(e),
};
if conn.stream_finished(s) {
self.process_finished_stream(s);
}
if let Some(ev) = ev {
return Ok(ev);
}
}
if let Some(finished) = self.finished_streams.pop_front() {
if conn.stream_readable(finished) {
if let Err(crate::Error::StreamReset(e)) =
conn.stream_recv(finished, &mut [])
{
return Ok((finished, Event::Reset(e)));
}
}
return Ok((finished, Event::Finished));
}
Err(Error::Done)
}
pub fn send_goaway<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, id: u64,
) -> Result<()> {
let mut id = id;
if !self.is_server {
id = 0;
}
if self.is_server && id % 4 != 0 {
return Err(Error::IdError);
}
if let Some(sent_id) = self.local_goaway_id {
if id > sent_id {
return Err(Error::IdError);
}
}
if let Some(stream_id) = self.control_stream_id {
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame = frame::Frame::GoAway { id };
let wire_len = frame.to_bytes(&mut b)?;
let stream_cap = conn.stream_capacity(stream_id)?;
if stream_cap < wire_len {
return Err(Error::StreamBlocked);
}
trace!("{} tx frm {:?}", conn.trace_id(), frame);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(octets::varint_len(id) as u64),
frame: frame.to_qlog(),
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
let off = b.off();
conn.stream_send(stream_id, &d[..off], false)?;
self.local_goaway_id = Some(id);
}
Ok(())
}
pub fn peer_settings_raw(&self) -> Option<&[(u64, u64)]> {
self.peer_settings.raw.as_deref()
}
fn open_uni_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, ty: u64,
) -> Result<u64> {
let stream_id = self.next_uni_stream_id;
let mut d = [0; 8];
let mut b = octets::OctetsMut::with_slice(&mut d);
match ty {
stream::HTTP3_CONTROL_STREAM_TYPE_ID |
stream::QPACK_ENCODER_STREAM_TYPE_ID |
stream::QPACK_DECODER_STREAM_TYPE_ID => {
conn.stream_priority(stream_id, 0, false)?;
},
stream::HTTP3_PUSH_STREAM_TYPE_ID => (),
_ => {
conn.stream_priority(stream_id, 255, false)?;
},
}
conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
self.next_uni_stream_id = self
.next_uni_stream_id
.checked_add(4)
.ok_or(Error::IdError)?;
Ok(stream_id)
}
fn open_qpack_encoder_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>,
) -> Result<()> {
let stream_id =
self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?;
self.local_qpack_streams.encoder_stream_id = Some(stream_id);
qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
stream_id,
initiator: Some(Initiator::Local),
stream_type: StreamType::QpackEncode,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
Ok(())
}
fn open_qpack_decoder_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>,
) -> Result<()> {
let stream_id =
self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?;
self.local_qpack_streams.decoder_stream_id = Some(stream_id);
qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
stream_id,
initiator: Some(Initiator::Local),
stream_type: StreamType::QpackDecode,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
Ok(())
}
fn send_grease_frames<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
) -> Result<()> {
let mut d = [0; 8];
let stream_cap = match conn.stream_capacity(stream_id) {
Ok(v) => v,
Err(e) => {
if conn.stream_finished(stream_id) {
self.streams.remove(&stream_id);
}
return Err(e.into());
},
};
let grease_frame1 = grease_value();
let grease_frame2 = grease_value();
let grease_payload = b"GREASE is the word";
let overhead = octets::varint_len(grease_frame1) + 1 + octets::varint_len(grease_frame2) + 1 + grease_payload.len();
if stream_cap < overhead {
return Ok(());
}
let mut b = octets::OctetsMut::with_slice(&mut d);
conn.stream_send(stream_id, b.put_varint(grease_frame1)?, false)?;
let mut b = octets::OctetsMut::with_slice(&mut d);
conn.stream_send(stream_id, b.put_varint(0)?, false)?;
trace!(
"{} tx frm GREASE stream={} len=0",
conn.trace_id(),
stream_id
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let frame = Http3Frame::Reserved {
frame_type_bytes: grease_frame1,
raw: None,
};
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(0),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
let mut b = octets::OctetsMut::with_slice(&mut d);
conn.stream_send(stream_id, b.put_varint(grease_frame2)?, false)?;
let mut b = octets::OctetsMut::with_slice(&mut d);
conn.stream_send(stream_id, b.put_varint(18)?, false)?;
conn.stream_send(stream_id, grease_payload, false)?;
trace!(
"{} tx frm GREASE stream={} len={}",
conn.trace_id(),
stream_id,
grease_payload.len()
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let frame = Http3Frame::Reserved {
frame_type_bytes: grease_frame2,
raw: None,
};
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id,
length: Some(grease_payload.len() as u64),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
Ok(())
}
fn open_grease_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>,
) -> Result<()> {
let ty = grease_value();
match self.open_uni_stream(conn, ty) {
Ok(stream_id) => {
conn.stream_send(stream_id, b"GREASE is the word", true)?;
trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
stream_id,
initiator: Some(Initiator::Local),
stream_type: StreamType::Unknown,
stream_type_bytes: Some(ty),
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
},
Err(Error::IdError) => {
trace!("{} GREASE stream blocked", conn.trace_id(),);
return Ok(());
},
Err(e) => return Err(e),
};
Ok(())
}
fn send_settings<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>,
) -> Result<()> {
let stream_id = match self
.open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)
{
Ok(v) => v,
Err(e) => {
trace!("{} Control stream blocked", conn.trace_id(),);
if e == Error::Done {
return Err(Error::InternalError);
}
return Err(e);
},
};
self.control_stream_id = Some(stream_id);
qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
let ev_data = EventData::Http3StreamTypeSet(StreamTypeSet {
stream_id,
initiator: Some(Initiator::Local),
stream_type: StreamType::Control,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
let grease = if conn.grease {
Some((grease_value(), grease_value()))
} else {
None
};
let frame = frame::Frame::Settings {
max_field_section_size: self.local_settings.max_field_section_size,
qpack_max_table_capacity: self
.local_settings
.qpack_max_table_capacity,
qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
connect_protocol_enabled: self
.local_settings
.connect_protocol_enabled,
h3_datagram: self.local_settings.h3_datagram,
grease,
additional_settings: self.local_settings.additional_settings.clone(),
raw: Default::default(),
};
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
if let Some(id) = self.control_stream_id {
conn.stream_send(id, &d[..off], false)?;
trace!(
"{} tx frm SETTINGS stream={} len={}",
conn.trace_id(),
id,
off
);
qlog_with_type!(QLOG_FRAME_CREATED, conn.qlog, q, {
let frame = frame.to_qlog();
let ev_data = EventData::Http3FrameCreated(FrameCreated {
stream_id: id,
length: Some(off as u64),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
}
Ok(())
}
fn process_control_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
) -> Result<(u64, Event)> {
close_conn_if_critical_stream_finished(conn, stream_id)?;
if !conn.stream_readable(stream_id) {
return Err(Error::Done);
}
match self.process_readable_stream(conn, stream_id, true) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
close_conn_if_critical_stream_finished(conn, stream_id)?;
Err(Error::Done)
}
fn process_readable_stream<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64, polling: bool,
) -> Result<(u64, Event)> {
self.streams
.entry(stream_id)
.or_insert_with(|| <stream::Stream>::new(stream_id, false));
while let Some(stream) = self.streams.get_mut(&stream_id) {
match stream.state() {
stream::State::StreamType => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
let ty = stream::Type::deserialize(varint)?;
if let Err(e) = stream.set_ty(ty) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
qlog_with_type!(QLOG_STREAM_TYPE_SET, conn.qlog, q, {
let ty_val = if matches!(ty, stream::Type::Unknown) {
Some(varint)
} else {
None
};
let ev_data =
EventData::Http3StreamTypeSet(StreamTypeSet {
stream_id,
initiator: Some(Initiator::Remote),
stream_type: ty.to_qlog(),
stream_type_bytes: ty_val,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
match &ty {
stream::Type::Control => {
if self.peer_control_stream_id.is_some() {
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple control streams",
)?;
return Err(Error::StreamCreationError);
}
trace!(
"{} open peer's control stream {}",
conn.trace_id(),
stream_id
);
close_conn_if_critical_stream_finished(
conn, stream_id,
)?;
self.peer_control_stream_id = Some(stream_id);
},
stream::Type::Push => {
if self.is_server {
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Server received push stream.",
)?;
return Err(Error::StreamCreationError);
}
},
stream::Type::QpackEncoder => {
if self.peer_qpack_streams.encoder_stream_id.is_some()
{
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple QPACK encoder streams",
)?;
return Err(Error::StreamCreationError);
}
close_conn_if_critical_stream_finished(
conn, stream_id,
)?;
self.peer_qpack_streams.encoder_stream_id =
Some(stream_id);
},
stream::Type::QpackDecoder => {
if self.peer_qpack_streams.decoder_stream_id.is_some()
{
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple QPACK decoder streams",
)?;
return Err(Error::StreamCreationError);
}
close_conn_if_critical_stream_finished(
conn, stream_id,
)?;
self.peer_qpack_streams.decoder_stream_id =
Some(stream_id);
},
stream::Type::Unknown => {
},
stream::Type::Request => unreachable!(),
}
},
stream::State::PushId => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
if let Err(e) = stream.set_push_id(varint) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
},
stream::State::FrameType => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
match stream.set_frame_type(varint) {
Err(Error::FrameUnexpected) => {
let msg = format!("Unexpected frame type {varint}");
conn.close(
true,
Error::FrameUnexpected.to_wire(),
msg.as_bytes(),
)?;
return Err(Error::FrameUnexpected);
},
Err(e) => {
conn.close(
true,
e.to_wire(),
b"Error handling frame.",
)?;
return Err(e);
},
_ => (),
}
},
stream::State::FramePayloadLen => {
stream.try_fill_buffer(conn)?;
let payload_len = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
if Some(frame::DATA_FRAME_TYPE_ID) == stream.frame_type() {
trace!(
"{} rx frm DATA stream={} wire_payload_len={}",
conn.trace_id(),
stream_id,
payload_len
);
qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
let frame = Http3Frame::Data { raw: None };
let ev_data =
EventData::Http3FrameParsed(FrameParsed {
stream_id,
length: Some(payload_len),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
}
if let Err(e) = stream.set_frame_payload_len(payload_len) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
},
stream::State::FramePayload => {
if !polling {
break;
}
stream.try_fill_buffer(conn)?;
let (frame, payload_len) = match stream.try_consume_frame() {
Ok(frame) => frame,
Err(Error::Done) => return Err(Error::Done),
Err(e) => {
conn.close(
true,
e.to_wire(),
b"Error handling frame.",
)?;
return Err(e);
},
};
match self.process_frame(conn, stream_id, frame, payload_len)
{
Ok(ev) => return Ok(ev),
Err(Error::Done) => {
if conn.stream_finished(stream_id) {
break;
}
},
Err(e) => return Err(e),
};
},
stream::State::Data => {
if !polling {
break;
}
if !stream.try_trigger_data_event() {
break;
}
return Ok((stream_id, Event::Data));
},
stream::State::QpackInstruction => {
let mut d = [0; 4096];
loop {
let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;
match stream.ty() {
Some(stream::Type::QpackEncoder) =>
self.peer_qpack_streams.encoder_stream_bytes +=
recv as u64,
Some(stream::Type::QpackDecoder) =>
self.peer_qpack_streams.decoder_stream_bytes +=
recv as u64,
_ => unreachable!(),
};
if fin {
close_conn_critical_stream(conn)?;
}
}
},
stream::State::Drain => {
conn.stream_shutdown(
stream_id,
crate::Shutdown::Read,
0x100,
)?;
break;
},
stream::State::Finished => break,
}
}
Err(Error::Done)
}
fn process_finished_stream(&mut self, stream_id: u64) {
let stream = match self.streams.get_mut(&stream_id) {
Some(v) => v,
None => return,
};
if stream.state() == stream::State::Finished {
return;
}
match stream.ty() {
Some(stream::Type::Request) | Some(stream::Type::Push) => {
stream.finished();
self.finished_streams.push_back(stream_id);
},
_ => (),
};
}
fn process_frame<F: BufFactory>(
&mut self, conn: &mut super::Connection<F>, stream_id: u64,
frame: frame::Frame, payload_len: u64,
) -> Result<(u64, Event)> {
trace!(
"{} rx frm {:?} stream={} payload_len={}",
conn.trace_id(),
frame,
stream_id,
payload_len
);
qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
if !matches!(frame, frame::Frame::Headers { .. }) {
let frame = frame.to_qlog();
let ev_data = EventData::Http3FrameParsed(FrameParsed {
stream_id,
length: Some(payload_len),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
}
});
match frame {
frame::Frame::Settings {
max_field_section_size,
qpack_max_table_capacity,
qpack_blocked_streams,
connect_protocol_enabled,
h3_datagram,
additional_settings,
raw,
..
} => {
self.peer_settings = ConnectionSettings {
max_field_section_size,
qpack_max_table_capacity,
qpack_blocked_streams,
connect_protocol_enabled,
h3_datagram,
additional_settings,
raw,
};
if let Some(1) = h3_datagram {
if conn.dgram_max_writable_len().is_none() {
conn.close(
true,
Error::SettingsError.to_wire(),
b"H3_DATAGRAM sent with value 1 but max_datagram_frame_size TP not set.",
)?;
return Err(Error::SettingsError);
}
}
},
frame::Frame::Headers { header_block } => {
if let Some(s) = self.streams.get_mut(&stream_id) {
if self.is_server && s.headers_received_count() == 2 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"Too many HEADERS frames",
)?;
return Err(Error::FrameUnexpected);
}
s.increment_headers_received();
}
let max_size = self
.local_settings
.max_field_section_size
.unwrap_or(u64::MAX);
let headers = match self
.qpack_decoder
.decode(&header_block[..], max_size)
{
Ok(v) => v,
Err(e) => {
let e = match e {
qpack::Error::HeaderListTooLarge =>
Error::ExcessiveLoad,
_ => Error::QpackDecompressionFailed,
};
conn.close(true, e.to_wire(), b"Error parsing headers.")?;
return Err(e);
},
};
qlog_with_type!(QLOG_FRAME_PARSED, conn.qlog, q, {
let qlog_headers = headers
.iter()
.map(|h| qlog::events::http3::HttpHeader {
name: Some(
String::from_utf8_lossy(h.name()).into_owned(),
),
name_bytes: None,
value: Some(
String::from_utf8_lossy(h.value()).into_owned(),
),
value_bytes: None,
})
.collect();
let frame = Http3Frame::Headers {
headers: qlog_headers,
raw: None,
};
let ev_data = EventData::Http3FrameParsed(FrameParsed {
stream_id,
length: Some(payload_len),
frame,
..Default::default()
});
q.add_event_data_now(ev_data).ok();
});
let more_frames = !conn.stream_finished(stream_id);
return Ok((stream_id, Event::Headers {
list: headers,
more_frames,
}));
},
frame::Frame::Data { .. } => {
},
frame::Frame::GoAway { id } => {
if !self.is_server && id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"GOAWAY received with ID of non-request stream",
)?;
return Err(Error::IdError);
}
if let Some(received_id) = self.peer_goaway_id {
if id > received_id {
conn.close(
true,
Error::IdError.to_wire(),
b"GOAWAY received with ID larger than previously received",
)?;
return Err(Error::IdError);
}
}
self.peer_goaway_id = Some(id);
return Ok((id, Event::GoAway));
},
frame::Frame::MaxPushId { push_id } => {
if !self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"MAX_PUSH_ID received by client",
)?;
return Err(Error::FrameUnexpected);
}
if push_id < self.max_push_id {
conn.close(
true,
Error::IdError.to_wire(),
b"MAX_PUSH_ID reduced limit",
)?;
return Err(Error::IdError);
}
self.max_push_id = push_id;
},
frame::Frame::PushPromise { .. } => {
if self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PUSH_PROMISE received by server",
)?;
return Err(Error::FrameUnexpected);
}
if stream_id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PUSH_PROMISE received on non-request stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::CancelPush { .. } => {
},
frame::Frame::PriorityUpdateRequest {
prioritized_element_id,
priority_field_value,
} => {
if !self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PRIORITY_UPDATE received by client",
)?;
return Err(Error::FrameUnexpected);
}
if prioritized_element_id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PRIORITY_UPDATE for request stream type with wrong ID",
)?;
return Err(Error::FrameUnexpected);
}
if prioritized_element_id > conn.streams.max_streams_bidi() * 4 {
conn.close(
true,
Error::IdError.to_wire(),
b"PRIORITY_UPDATE for request stream beyond max streams limit",
)?;
return Err(Error::IdError);
}
if conn.streams.is_collected(prioritized_element_id) {
return Err(Error::Done);
}
let stream =
self.streams.entry(prioritized_element_id).or_insert_with(
|| <stream::Stream>::new(prioritized_element_id, false),
);
let had_priority_update = stream.has_last_priority_update();
stream.set_last_priority_update(Some(priority_field_value));
if !had_priority_update {
return Ok((prioritized_element_id, Event::PriorityUpdate));
} else {
return Err(Error::Done);
}
},
frame::Frame::PriorityUpdatePush {
prioritized_element_id,
..
} => {
if !self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PRIORITY_UPDATE received by client",
)?;
return Err(Error::FrameUnexpected);
}
if prioritized_element_id % 3 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PRIORITY_UPDATE for push stream type with wrong ID",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::Unknown { .. } => (),
}
Err(Error::Done)
}
#[inline]
pub fn stats(&self) -> Stats {
Stats {
qpack_encoder_stream_recv_bytes: self
.peer_qpack_streams
.encoder_stream_bytes,
qpack_decoder_stream_recv_bytes: self
.peer_qpack_streams
.decoder_stream_bytes,
}
}
}
pub fn grease_value() -> u64 {
let n = super::rand::rand_u64_uniform(148_764_065_110_560_899);
31 * n + 33
}
#[doc(hidden)]
#[cfg(any(test, feature = "internal"))]
pub mod testing {
use super::*;
use crate::test_utils;
use crate::DefaultBufFactory;
pub struct Session<F = DefaultBufFactory>
where
F: BufFactory,
{
pub pipe: test_utils::Pipe<F>,
pub client: Connection,
pub server: Connection,
}
impl Session {
pub fn new() -> Result<Session> {
Session::<DefaultBufFactory>::new_with_buf()
}
pub fn with_configs(
config: &mut crate::Config, h3_config: &Config,
) -> Result<Session> {
Session::<DefaultBufFactory>::with_configs_and_buf(config, h3_config)
}
pub fn default_configs() -> Result<(crate::Config, Config)> {
fn path_relative_to_manifest_dir(path: &str) -> String {
std::fs::canonicalize(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(path),
)
.unwrap()
.to_string_lossy()
.into_owned()
}
let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file(
&path_relative_to_manifest_dir("examples/cert.crt"),
)?;
config.load_priv_key_from_pem_file(
&path_relative_to_manifest_dir("examples/cert.key"),
)?;
config.set_application_protos(&[b"h3"])?;
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.enable_dgram(true, 3, 3);
config.set_ack_delay_exponent(8);
let h3_config = Config::new()?;
Ok((config, h3_config))
}
}
impl<F: BufFactory> Session<F> {
pub fn new_with_buf() -> Result<Session<F>> {
let (mut config, h3_config) = Session::default_configs()?;
Session::with_configs_and_buf(&mut config, &h3_config)
}
pub fn with_configs_and_buf(
config: &mut crate::Config, h3_config: &Config,
) -> Result<Session<F>> {
let pipe = test_utils::Pipe::with_config_and_buf(config)?;
let client_dgram = pipe.client.dgram_enabled();
let server_dgram = pipe.server.dgram_enabled();
Ok(Session {
pipe,
client: Connection::new(h3_config, false, client_dgram)?,
server: Connection::new(h3_config, true, server_dgram)?,
})
}
pub fn handshake(&mut self) -> Result<()> {
self.pipe.handshake()?;
self.client.send_settings(&mut self.pipe.client)?;
self.pipe.advance().ok();
self.client
.open_qpack_encoder_stream(&mut self.pipe.client)?;
self.pipe.advance().ok();
self.client
.open_qpack_decoder_stream(&mut self.pipe.client)?;
self.pipe.advance().ok();
if self.pipe.client.grease {
self.client.open_grease_stream(&mut self.pipe.client)?;
}
self.pipe.advance().ok();
self.server.send_settings(&mut self.pipe.server)?;
self.pipe.advance().ok();
self.server
.open_qpack_encoder_stream(&mut self.pipe.server)?;
self.pipe.advance().ok();
self.server
.open_qpack_decoder_stream(&mut self.pipe.server)?;
self.pipe.advance().ok();
if self.pipe.server.grease {
self.server.open_grease_stream(&mut self.pipe.server)?;
}
self.advance().ok();
while self.client.poll(&mut self.pipe.client).is_ok() {
}
while self.server.poll(&mut self.pipe.server).is_ok() {
}
Ok(())
}
pub fn advance(&mut self) -> crate::Result<()> {
self.pipe.advance()
}
pub fn poll_client(&mut self) -> Result<(u64, Event)> {
self.client.poll(&mut self.pipe.client)
}
pub fn poll_server(&mut self) -> Result<(u64, Event)> {
self.server.poll(&mut self.pipe.server)
}
pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
Header::new(b"user-agent", b"quiche-test"),
];
let stream =
self.client.send_request(&mut self.pipe.client, &req, fin)?;
self.advance().ok();
Ok((stream, req))
}
pub fn send_response(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<Header>> {
let resp = vec![
Header::new(b":status", b"200"),
Header::new(b"server", b"quiche-test"),
];
self.server.send_response(
&mut self.pipe.server,
stream,
&resp,
fin,
)?;
self.advance().ok();
Ok(resp)
}
pub fn send_body_client(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
self.client
.send_body(&mut self.pipe.client, stream, &bytes, fin)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_body_client(
&mut self, stream: u64, buf: &mut [u8],
) -> Result<usize> {
self.client.recv_body(&mut self.pipe.client, stream, buf)
}
pub fn recv_body_buf_client<B: bytes::BufMut>(
&mut self, stream: u64, buf: B,
) -> Result<usize> {
self.client
.recv_body_buf(&mut self.pipe.client, stream, buf)
}
pub fn send_body_server(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
self.server
.send_body(&mut self.pipe.server, stream, &bytes, fin)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_body_server(
&mut self, stream: u64, buf: &mut [u8],
) -> Result<usize> {
self.server.recv_body(&mut self.pipe.server, stream, buf)
}
pub fn recv_body_buf_server<B: bytes::BufMut>(
&mut self, stream: u64, buf: B,
) -> Result<usize> {
self.server
.recv_body_buf(&mut self.pipe.server, stream, buf)
}
pub fn send_frame_client(
&mut self, frame: frame::Frame, stream_id: u64, fin: bool,
) -> Result<()> {
let mut d = [42; 65535];
let mut b = octets::OctetsMut::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
self.advance().ok();
Ok(())
}
pub fn send_dgram_client(&mut self, flow_id: u64) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let len = octets::varint_len(flow_id) + bytes.len();
let mut d = vec![0; len];
let mut b = octets::OctetsMut::with_slice(&mut d);
b.put_varint(flow_id)?;
b.put_bytes(&bytes)?;
self.pipe.client.dgram_send(&d)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_dgram_client(
&mut self, buf: &mut [u8],
) -> Result<(usize, u64, usize)> {
let len = self.pipe.client.dgram_recv(buf)?;
let mut b = octets::Octets::with_slice(buf);
let flow_id = b.get_varint()?;
Ok((len, flow_id, b.off()))
}
pub fn send_dgram_server(&mut self, flow_id: u64) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let len = octets::varint_len(flow_id) + bytes.len();
let mut d = vec![0; len];
let mut b = octets::OctetsMut::with_slice(&mut d);
b.put_varint(flow_id)?;
b.put_bytes(&bytes)?;
self.pipe.server.dgram_send(&d)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_dgram_server(
&mut self, buf: &mut [u8],
) -> Result<(usize, u64, usize)> {
let len = self.pipe.server.dgram_recv(buf)?;
let mut b = octets::Octets::with_slice(buf);
let flow_id = b.get_varint()?;
Ok((len, flow_id, b.off()))
}
pub fn send_frame_server(
&mut self, frame: frame::Frame, stream_id: u64, fin: bool,
) -> Result<()> {
let mut d = [42; 65535];
let mut b = octets::OctetsMut::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
self.advance().ok();
Ok(())
}
pub fn send_arbitrary_stream_data_client(
&mut self, data: &[u8], stream_id: u64, fin: bool,
) -> Result<()> {
self.pipe.client.stream_send(stream_id, data, fin)?;
self.advance().ok();
Ok(())
}
pub fn send_arbitrary_stream_data_server(
&mut self, data: &[u8], stream_id: u64, fin: bool,
) -> Result<()> {
self.pipe.server.stream_send(stream_id, data, fin)?;
self.advance().ok();
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use bytes::BufMut as _;
use super::*;
use super::testing::*;
#[test]
fn grease_value_in_varint_limit() {
assert!(grease_value() < 2u64.pow(62) - 1);
}
#[cfg(not(feature = "openssl"))] #[test]
fn h3_handshake_0rtt() {
let mut buf = [0; 65535];
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(&[b"proto1", b"proto2"])
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_early_data();
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let session = pipe.client.session().unwrap();
let mut pipe = crate::test_utils::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.client.set_session(session), Ok(()));
assert!(matches!(
Connection::with_transport(&mut pipe.client, &h3_config),
Err(Error::InternalError)
));
let (len, _) = pipe.client.send(&mut buf).unwrap();
assert!(Connection::with_transport(&mut pipe.client, &h3_config).is_ok());
assert_eq!(pipe.server_recv(&mut buf[..len]), Ok(len));
let pkt_type = crate::packet::Type::ZeroRTT;
let frames = [crate::frame::Frame::Stream {
stream_id: 6,
data: <crate::range_buf::RangeBuf>::from(b"aaaaa", 0, true),
}];
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Ok(1200)
);
assert_eq!(pipe.server.undecryptable_pkts.len(), 0);
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(6));
assert_eq!(r.next(), None);
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(6, &mut b), Ok((5, true)));
assert_eq!(&b[..5], b"aaaaa");
}
#[test]
fn request_no_body_response_no_body() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_one_chunk() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, false).unwrap();
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_many_chunks() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let total_data_frames = 4;
let resp = s.send_response(stream, false).unwrap();
for _ in 0..total_data_frames - 1 {
s.send_body_server(stream, false).unwrap();
}
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.poll_client(), Err(Error::Done));
for _ in 0..total_data_frames {
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
}
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_many_chunks_with_buf() {
let (mut config, h3_config) = Session::default_configs().unwrap();
config.set_initial_congestion_window_packets(100);
config.set_initial_max_data(200_000);
config.set_initial_max_stream_data_bidi_local(200_000);
config.set_initial_max_stream_data_bidi_remote(200_000);
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let total_data_frames = 4;
let data = vec![0xab_u8; 16 * 1024];
let resp = s.send_response(stream, false).unwrap();
for _ in 0..total_data_frames - 1 {
assert_eq!(
s.server.send_body(&mut s.pipe.server, stream, &data, false),
Ok(data.len())
);
s.advance().ok();
}
s.server
.send_body(&mut s.pipe.server, stream, &data, true)
.unwrap();
s.advance().ok();
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.poll_client(), Err(Error::Done));
let how_much_to_read_per_call = data.len() * 2 / 3;
let mut remaining_to_read = total_data_frames * data.len();
let mut recv_buf = Vec::new().limit(how_much_to_read_per_call);
assert_eq!(
s.recv_body_buf_client(stream, &mut recv_buf),
Ok(how_much_to_read_per_call)
);
remaining_to_read -= how_much_to_read_per_call;
assert_eq!(recv_buf.get_ref().len(), how_much_to_read_per_call);
while remaining_to_read > 0 {
recv_buf.set_limit(data.len());
let expected = std::cmp::min(data.len(), remaining_to_read);
assert_eq!(
s.recv_body_buf_client(stream, &mut recv_buf),
Ok(expected)
);
remaining_to_read -= expected;
}
assert_eq!(recv_buf.get_ref().len(), total_data_frames * data.len());
assert_eq!(
s.recv_body_buf_client(stream, &mut recv_buf),
Err(Error::Done)
);
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_one_chunk_response_no_body() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
#[test]
fn request_many_chunks_response_no_body() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let total_data_frames = 4;
for _ in 0..total_data_frames - 1 {
s.send_body_client(stream, false).unwrap();
}
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
for _ in 0..total_data_frames {
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
}
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
#[test]
fn many_requests_many_chunks_response_one_chunk() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let mut reqs = Vec::new();
let (stream1, req1) = s.send_request(false).unwrap();
assert_eq!(stream1, 0);
reqs.push(req1);
let (stream2, req2) = s.send_request(false).unwrap();
assert_eq!(stream2, 4);
reqs.push(req2);
let (stream3, req3) = s.send_request(false).unwrap();
assert_eq!(stream3, 8);
reqs.push(req3);
let body = s.send_body_client(stream1, false).unwrap();
s.send_body_client(stream2, false).unwrap();
s.send_body_client(stream3, false).unwrap();
let mut recv_buf = vec![0; body.len()];
s.send_body_client(stream3, true).unwrap();
s.send_body_client(stream2, true).unwrap();
s.send_body_client(stream1, true).unwrap();
let (_, ev) = s.poll_server().unwrap();
let ev_headers = Event::Headers {
list: reqs[0].clone(),
more_frames: true,
};
assert_eq!(ev, ev_headers);
let (_, ev) = s.poll_server().unwrap();
let ev_headers = Event::Headers {
list: reqs[1].clone(),
more_frames: true,
};
assert_eq!(ev, ev_headers);
let (_, ev) = s.poll_server().unwrap();
let ev_headers = Event::Headers {
list: reqs[2].clone(),
more_frames: true,
};
assert_eq!(ev, ev_headers);
assert_eq!(s.poll_server(), Ok((0, Event::Data)));
assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_body_server(0, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((0, Event::Finished)));
assert_eq!(s.poll_server(), Ok((4, Event::Data)));
assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_body_server(4, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((4, Event::Finished)));
assert_eq!(s.poll_server(), Ok((8, Event::Data)));
assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_body_server(8, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((8, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
let mut resps = Vec::new();
let resp1 = s.send_response(stream1, true).unwrap();
resps.push(resp1);
let resp2 = s.send_response(stream2, true).unwrap();
resps.push(resp2);
let resp3 = s.send_response(stream3, true).unwrap();
resps.push(resp3);
for _ in 0..resps.len() {
let (stream, ev) = s.poll_client().unwrap();
let ev_headers = Event::Headers {
list: resps[(stream / 4) as usize].clone(),
more_frames: false,
};
assert_eq!(ev, ev_headers);
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_one_chunk_empty_fin() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, false).unwrap();
let body = s.send_body_server(stream, false).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.pipe.server.stream_send(stream, &[], true), Ok(0));
s.advance().ok();
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_no_body_with_grease() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, false).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
s.pipe.server.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(10).unwrap();
s.pipe.server.stream_send(0, frame_len, false).unwrap();
s.pipe.server.stream_send(0, &d, true).unwrap();
s.advance().ok();
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn body_response_before_headers() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(
s.send_body_server(stream, true),
Err(Error::FrameUnexpected)
);
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn send_body_invalid_client_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
assert_eq!(s.send_body_client(0, true), Err(Error::FrameUnexpected));
assert_eq!(
s.send_body_client(s.client.control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_client(
s.client.local_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_client(
s.client.local_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_client(s.client.peer_control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_client(
s.client.peer_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_client(
s.client.peer_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
}
#[test]
fn send_body_invalid_server_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
assert_eq!(
s.send_body_server(s.server.control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.local_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.local_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
}
#[test]
fn trailers() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, false).unwrap();
let mut recv_buf = vec![0; body.len()];
let req_trailers = vec![Header::new(b"foo", b"bar")];
s.client
.send_additional_headers(
&mut s.pipe.client,
stream,
&req_trailers,
true,
true,
)
.unwrap();
s.advance().ok();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
let ev_trailers = Event::Headers {
list: req_trailers,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
}
#[test]
fn informational_response() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let info_resp = vec![
Header::new(b":status", b"103"),
Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
];
let resp = vec![
Header::new(b":status", b"200"),
Header::new(b"server", b"quiche-test"),
];
s.server
.send_response(&mut s.pipe.server, stream, &info_resp, false)
.unwrap();
s.server
.send_additional_headers(
&mut s.pipe.server,
stream,
&resp,
false,
true,
)
.unwrap();
s.advance().ok();
let ev_info_headers = Event::Headers {
list: info_resp,
more_frames: true,
};
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn no_multiple_response() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let info_resp = vec![
Header::new(b":status", b"103"),
Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
];
let resp = vec![
Header::new(b":status", b"200"),
Header::new(b"server", b"quiche-test"),
];
s.server
.send_response(&mut s.pipe.server, stream, &info_resp, false)
.unwrap();
assert_eq!(
Err(Error::FrameUnexpected),
s.server
.send_response(&mut s.pipe.server, stream, &resp, true)
);
s.advance().ok();
let ev_info_headers = Event::Headers {
list: info_resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_info_headers)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn no_send_additional_before_initial_response() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let info_resp = vec![
Header::new(b":status", b"103"),
Header::new(b"link", b"<https://example.com>; rel=\"preconnect\""),
];
assert_eq!(
Err(Error::FrameUnexpected),
s.server.send_additional_headers(
&mut s.pipe.server,
stream,
&info_resp,
false,
false
)
);
s.advance().ok();
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn additional_headers_before_data_client() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let req_trailer = vec![Header::new(b"goodbye", b"world")];
assert_eq!(
s.client.send_additional_headers(
&mut s.pipe.client,
stream,
&req_trailer,
true,
false
),
Ok(())
);
s.advance().ok();
let ev_initial_headers = Event::Headers {
list: req,
more_frames: true,
};
let ev_trailing_headers = Event::Headers {
list: req_trailer,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_initial_headers)));
assert_eq!(s.poll_server(), Ok((stream, ev_trailing_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn data_after_trailers_client() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, false).unwrap();
let mut recv_buf = vec![0; body.len()];
let req_trailers = vec![Header::new(b"foo", b"bar")];
s.client
.send_additional_headers(
&mut s.pipe.client,
stream,
&req_trailers,
true,
false,
)
.unwrap();
s.advance().ok();
s.send_frame_client(
frame::Frame::Data {
payload: vec![1, 2, 3, 4],
},
stream,
true,
)
.unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
let ev_trailers = Event::Headers {
list: req_trailers,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, ev_trailers)));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn max_push_id_from_client_good() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn max_push_id_from_client_bad_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 2 },
stream,
false,
)
.unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn max_push_id_from_client_limit_reduction() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 2 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::IdError));
}
#[test]
fn max_push_id_from_server() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::MaxPushId { push_id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
}
#[test]
fn push_promise_from_client() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let header_block = s.client.encode_header_block(&req).unwrap();
s.send_frame_client(
frame::Frame::PushPromise {
push_id: 1,
header_block,
},
stream,
false,
)
.unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn cancel_push_from_client() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::CancelPush { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn cancel_push_from_client_bad_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_frame_client(
frame::Frame::CancelPush { push_id: 2 },
stream,
false,
)
.unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn cancel_push_from_server() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::CancelPush { push_id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn goaway_from_client_good() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client.send_goaway(&mut s.pipe.client, 100).unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::GoAway)));
}
#[test]
fn goaway_from_server_good() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
s.advance().ok();
assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
}
#[test]
fn client_request_after_goaway() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.server.send_goaway(&mut s.pipe.server, 4000).unwrap();
s.advance().ok();
assert_eq!(s.poll_client(), Ok((4000, Event::GoAway)));
assert_eq!(s.send_request(true), Err(Error::FrameUnexpected));
}
#[test]
fn goaway_from_server_invalid_id() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::GoAway { id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::IdError));
}
#[test]
fn goaway_from_server_increase_id() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::GoAway { id: 0 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
s.send_frame_server(
frame::Frame::GoAway { id: 4 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Ok((0, Event::GoAway)));
assert_eq!(s.poll_client(), Err(Error::IdError));
}
#[test]
#[cfg(feature = "sfv")]
fn parse_priority_field_value() {
assert_eq!(
Ok(Priority::new(0, false)),
Priority::try_from(b"u=0".as_slice())
);
assert_eq!(
Ok(Priority::new(3, false)),
Priority::try_from(b"u=3".as_slice())
);
assert_eq!(
Ok(Priority::new(7, false)),
Priority::try_from(b"u=7".as_slice())
);
assert_eq!(
Ok(Priority::new(0, true)),
Priority::try_from(b"u=0, i".as_slice())
);
assert_eq!(
Ok(Priority::new(3, true)),
Priority::try_from(b"u=3, i".as_slice())
);
assert_eq!(
Ok(Priority::new(7, true)),
Priority::try_from(b"u=7, i".as_slice())
);
assert_eq!(
Ok(Priority::new(0, true)),
Priority::try_from(b"u=0, i=?1".as_slice())
);
assert_eq!(
Ok(Priority::new(3, true)),
Priority::try_from(b"u=3, i=?1".as_slice())
);
assert_eq!(
Ok(Priority::new(7, true)),
Priority::try_from(b"u=7, i=?1".as_slice())
);
assert_eq!(
Ok(Priority::new(3, false)),
Priority::try_from(b"".as_slice())
);
assert_eq!(
Ok(Priority::new(0, true)),
Priority::try_from(b"u=0;foo, i;bar".as_slice())
);
assert_eq!(
Ok(Priority::new(3, true)),
Priority::try_from(b"u=3;hello, i;world".as_slice())
);
assert_eq!(
Ok(Priority::new(7, true)),
Priority::try_from(b"u=7;croeso, i;gymru".as_slice())
);
assert_eq!(
Ok(Priority::new(0, true)),
Priority::try_from(b"u=0, i, spinaltap=11".as_slice())
);
assert_eq!(Err(Error::Done), Priority::try_from(b"0".as_slice()));
assert_eq!(
Ok(Priority::new(7, false)),
Priority::try_from(b"u=-1".as_slice())
);
assert_eq!(Err(Error::Done), Priority::try_from(b"u=0.2".as_slice()));
assert_eq!(
Ok(Priority::new(7, false)),
Priority::try_from(b"u=100".as_slice())
);
assert_eq!(
Err(Error::Done),
Priority::try_from(b"u=3, i=true".as_slice())
);
assert_eq!(Err(Error::Done), Priority::try_from(b"u=7, ".as_slice()));
}
#[test]
fn priority_update_request() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn priority_update_single_stream_rearm() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 5,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=5".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 7,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=7".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
}
#[test]
fn priority_update_request_multiple_stream_arm_multiple_flights() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 4, &Priority {
urgency: 1,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 8, &Priority {
urgency: 2,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=1".to_vec()));
assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=2".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
}
#[test]
fn priority_update_request_multiple_stream_arm_single_flight() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let mut d = [42; 65535];
let mut b = octets::OctetsMut::with_slice(&mut d);
let p1 = frame::Frame::PriorityUpdateRequest {
prioritized_element_id: 0,
priority_field_value: b"u=3".to_vec(),
};
let p2 = frame::Frame::PriorityUpdateRequest {
prioritized_element_id: 4,
priority_field_value: b"u=3".to_vec(),
};
let p3 = frame::Frame::PriorityUpdateRequest {
prioritized_element_id: 8,
priority_field_value: b"u=3".to_vec(),
};
p1.to_bytes(&mut b).unwrap();
p2.to_bytes(&mut b).unwrap();
p3.to_bytes(&mut b).unwrap();
let off = b.off();
s.pipe
.client
.stream_send(s.client.control_stream_id.unwrap(), &d[..off], false)
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Ok((4, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Ok((8, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(4), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(8), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
}
#[test]
fn priority_update_request_collected_completed() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn priority_update_request_collected_stopped() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((0, Event::PriorityUpdate)));
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.server.take_last_priority_update(0), Ok(b"u=3".to_vec()));
assert_eq!(s.server.take_last_priority_update(0), Err(Error::Done));
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Write, 0x100)
.unwrap();
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Read, 0x100)
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((0, Event::Reset(0x100))));
assert_eq!(s.poll_server(), Err(Error::Done));
s.client
.send_priority_update_for_request(&mut s.pipe.client, 0, &Priority {
urgency: 3,
incremental: false,
})
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::Done));
assert!(s.pipe.server.streams.is_collected(0));
assert!(s.pipe.client.streams.is_collected(0));
}
#[test]
fn priority_update_push() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::PriorityUpdatePush {
prioritized_element_id: 3,
priority_field_value: b"u=3".to_vec(),
},
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn priority_update_request_bad_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::PriorityUpdateRequest {
prioritized_element_id: 5,
priority_field_value: b"u=3".to_vec(),
},
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn priority_update_push_bad_stream() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::PriorityUpdatePush {
prioritized_element_id: 5,
priority_field_value: b"u=3".to_vec(),
},
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn priority_update_request_from_server() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::PriorityUpdateRequest {
prioritized_element_id: 0,
priority_field_value: b"u=3".to_vec(),
},
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
}
#[test]
fn priority_update_push_from_server() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::PriorityUpdatePush {
prioritized_element_id: 0,
priority_field_value: b"u=3".to_vec(),
},
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
}
#[test]
fn uni_stream_local_counting() {
let config = Config::new().unwrap();
let h3_cln = Connection::new(&config, false, false).unwrap();
assert_eq!(h3_cln.next_uni_stream_id, 2);
let h3_srv = Connection::new(&config, true, false).unwrap();
assert_eq!(h3_srv.next_uni_stream_id, 3);
}
#[test]
fn open_multiple_control_streams() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let stream_id = s.client.next_uni_stream_id;
let mut d = [42; 8];
let mut b = octets::OctetsMut::with_slice(&mut d);
s.pipe
.client
.stream_send(
stream_id,
b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
false,
)
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
}
#[test]
fn close_control_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.pipe
.client
.stream_send(s.client.control_stream_id.unwrap(), &[], true)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn close_control_stream_after_frame() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
true,
)
.unwrap();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn reset_control_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.pipe
.client
.stream_shutdown(
s.client.control_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn reset_control_stream_after_frame() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
s.pipe
.client
.stream_shutdown(
s.client.control_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn close_qpack_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.pipe
.client
.stream_send(
s.client.local_qpack_streams.encoder_stream_id.unwrap(),
&[],
true,
)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn close_qpack_stream_after_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d = [0; 1];
s.pipe.client.stream_send(stream_id, &d, false).unwrap();
s.pipe.client.stream_send(stream_id, &d, true).unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn reset_qpack_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
s.pipe
.client
.stream_shutdown(
s.client.local_qpack_streams.encoder_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn reset_qpack_stream_after_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d = [0; 1];
s.pipe.client.stream_send(stream_id, &d, false).unwrap();
s.pipe.client.stream_send(stream_id, &d, false).unwrap();
s.advance().ok();
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
s.pipe
.client
.stream_shutdown(stream_id, crate::Shutdown::Write, 0)
.unwrap();
s.advance().ok();
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}
#[test]
fn qpack_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
let d = [0; 20];
s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
s.advance().ok();
s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
s.advance().ok();
match s.server.poll(&mut s.pipe.server) {
Ok(_) => panic!(),
Err(Error::Done) => {
assert_eq!(s.server.peer_qpack_streams.encoder_stream_bytes, 20);
assert_eq!(s.server.peer_qpack_streams.decoder_stream_bytes, 20);
},
Err(_) => {
panic!();
},
}
let stats = s.server.stats();
assert_eq!(stats.qpack_encoder_stream_recv_bytes, 20);
assert_eq!(stats.qpack_decoder_stream_recv_bytes, 20);
}
#[test]
fn max_state_buf_size() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
Header::new(b"user-agent", b"quiche-test"),
];
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, false),
Ok(0)
);
s.advance().ok();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, ev_headers)));
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(1 << 24).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, &d, false).unwrap();
s.advance().ok();
assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(1 << 24).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, &d, false).unwrap();
s.advance().ok();
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
}
#[test]
fn stream_backpressure() {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let total_data_frames = 6;
for _ in 0..total_data_frames {
assert_eq!(
s.client
.send_body(&mut s.pipe.client, stream, &bytes, false),
Ok(bytes.len())
);
s.advance().ok();
}
assert_eq!(
s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
Ok(bytes.len() - 2)
);
s.advance().ok();
let mut recv_buf = vec![0; bytes.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
for _ in 0..total_data_frames {
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len())
);
}
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len() - 2)
);
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.data_blocked_recv_count, 0);
assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 1);
assert_eq!(s.pipe.client.data_blocked_sent_count, 0);
assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 1);
assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
}
#[test]
fn request_max_header_size_limit() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let mut h3_config = Config::new().unwrap();
h3_config.set_max_field_section_size(65);
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
Header::new(b"aaaaaaa", b"aaaaaaaa"),
];
let stream = s
.client
.send_request(&mut s.pipe.client, &req, true)
.unwrap();
s.advance().ok();
assert_eq!(stream, 0);
assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
assert_eq!(
s.pipe.server.local_error.as_ref().unwrap().error_code,
Error::to_wire(Error::ExcessiveLoad)
);
}
#[test]
fn transport_error() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
Header::new(b"user-agent", b"quiche-test"),
];
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(8));
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, true),
Ok(12)
);
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, true),
Ok(16)
);
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, true),
Err(Error::TransportError(crate::Error::StreamLimit))
);
}
#[test]
fn data_before_headers() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(5).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, b"hello", false).unwrap();
s.advance().ok();
assert_eq!(
s.server.poll(&mut s.pipe.server),
Err(Error::FrameUnexpected)
);
}
#[test]
fn poll_after_error() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let mut d = [42; 128];
let mut b = octets::OctetsMut::with_slice(&mut d);
let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(1 << 24).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, &d, false).unwrap();
s.advance().ok();
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::ExcessiveLoad));
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
}
#[test]
fn headers_blocked() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
];
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, true),
Err(Error::StreamBlocked)
);
assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
assert_eq!(s.pipe.client.stream_writable_next(), None);
s.advance().ok();
assert_eq!(s.pipe.client.stream_writable_next(), Some(4));
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(4));
assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
}
#[test]
fn headers_blocked_on_conn() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let d = [42; 28];
assert_eq!(s.pipe.client.stream_send(2, &d, false), Ok(23));
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
];
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, true),
Err(Error::StreamBlocked)
);
assert_eq!(s.pipe.client.stream_writable_next(), None);
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::Done));
s.advance().ok();
assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
assert_eq!(s.client.send_request(&mut s.pipe.client, &req, true), Ok(0));
assert_eq!(s.pipe.server.data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.stream_data_blocked_sent_count, 0);
assert_eq!(s.pipe.server.data_blocked_recv_count, 1);
assert_eq!(s.pipe.server.stream_data_blocked_recv_count, 0);
assert_eq!(s.pipe.client.data_blocked_sent_count, 1);
assert_eq!(s.pipe.client.stream_data_blocked_sent_count, 0);
assert_eq!(s.pipe.client.data_blocked_recv_count, 0);
assert_eq!(s.pipe.client.stream_data_blocked_recv_count, 0);
}
#[test]
fn send_body_truncation_stream_blocked() {
use crate::test_utils::decode_pkt;
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(10000); config.set_initial_max_stream_data_bidi_local(80);
config.set_initial_max_stream_data_bidi_remote(80);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let _ = s.send_response(stream, false).unwrap();
assert_eq!(s.pipe.server.streams.blocked().len(), 0);
let d = [42; 500];
let mut off = 0;
let sent = s
.server
.send_body(&mut s.pipe.server, stream, &d, true)
.unwrap();
assert_eq!(sent, 25);
off += sent;
assert_eq!(s.pipe.server.streams.blocked().len(), 1);
assert_eq!(
s.server
.send_body(&mut s.pipe.server, stream, &d[off..], true),
Err(Error::Done)
);
assert_eq!(s.pipe.server.streams.blocked().len(), 1);
let mut buf = [0; 65535];
let (len, _) = s.pipe.server.send(&mut buf).unwrap();
let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&crate::frame::Frame::StreamDataBlocked {
stream_id: 0,
limit: 80,
})
);
assert_eq!(s.pipe.server.streams.blocked().len(), 0);
assert_eq!(
s.server
.send_body(&mut s.pipe.server, stream, &d[off..], true),
Err(Error::Done)
);
assert_eq!(s.pipe.server.streams.blocked().len(), 0);
assert_eq!(s.pipe.server.send(&mut buf), Err(crate::Error::Done));
let frames = [crate::frame::Frame::MaxStreamData {
stream_id: 0,
max: 100,
}];
let pkt_type = crate::packet::Type::Short;
assert_eq!(
s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Ok(39),
);
let sent = s
.server
.send_body(&mut s.pipe.server, stream, &d[off..], true)
.unwrap();
assert_eq!(sent, 18);
assert_eq!(s.pipe.server.streams.blocked().len(), 1);
assert_eq!(
s.server
.send_body(&mut s.pipe.server, stream, &d[off..], true),
Err(Error::Done)
);
assert_eq!(s.pipe.server.streams.blocked().len(), 1);
let (len, _) = s.pipe.server.send(&mut buf).unwrap();
let frames = decode_pkt(&mut s.pipe.client, &mut buf[..len]).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&crate::frame::Frame::StreamDataBlocked {
stream_id: 0,
limit: 100,
})
);
}
#[test]
fn send_body_stream_blocked_by_small_cwnd() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(100000); config.set_initial_max_stream_data_bidi_local(100000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let _ = s.send_response(stream, false).unwrap();
assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
assert_eq!(s.pipe.server.stream_writable_next(), None);
let send_buf = [42; 80000];
let sent = s
.server
.send_body(&mut s.pipe.server, stream, &send_buf, true)
.unwrap();
assert_eq!(sent, 11995);
s.advance().ok();
let mut recv_buf = [42; 80000];
assert!(s.poll_client().is_ok());
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11995));
s.advance().ok();
assert!(s.pipe.server.tx_cap < send_buf.len() - sent);
assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
}
#[test]
fn send_body_stream_blocked_zero_length() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(100000); config.set_initial_max_stream_data_bidi_local(100000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let _ = s.send_response(stream, false).unwrap();
assert_eq!(s.pipe.server.stream_writable_next(), Some(3));
assert_eq!(s.pipe.server.stream_writable_next(), Some(7));
assert_eq!(s.pipe.server.stream_writable_next(), Some(11));
assert_eq!(s.pipe.server.stream_writable_next(), Some(stream));
assert_eq!(s.pipe.server.stream_writable_next(), None);
let send_buf = [42; 11994];
let sent = s
.server
.send_body(&mut s.pipe.server, stream, &send_buf, false)
.unwrap();
assert_eq!(sent, 11994);
assert_eq!(s.pipe.server.stream_capacity(stream).unwrap(), 3);
assert_eq!(
s.server
.send_body(&mut s.pipe.server, stream, &send_buf, false),
Err(Error::Done)
);
s.advance().ok();
let mut recv_buf = [42; 80000];
assert!(s.poll_client().is_ok());
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(11994));
s.advance().ok();
assert_eq!(s.pipe.server.stream_writable_next(), Some(0));
}
#[test]
fn zero_length_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
assert_eq!(
s.client.send_body(&mut s.pipe.client, 0, b"", false),
Err(Error::Done)
);
assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
s.advance().ok();
let mut recv_buf = vec![0; 100];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, false).unwrap();
assert_eq!(
s.server.send_body(&mut s.pipe.server, 0, b"", false),
Err(Error::Done)
);
assert_eq!(s.server.send_body(&mut s.pipe.server, 0, b"", true), Ok(0));
s.advance().ok();
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Err(Error::Done));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn zero_length_data_blocked() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(69);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(b":method", b"GET"),
Header::new(b":scheme", b"https"),
Header::new(b":authority", b"quic.tech"),
Header::new(b":path", b"/test"),
];
assert_eq!(
s.client.send_request(&mut s.pipe.client, &req, false),
Ok(0)
);
assert_eq!(
s.client.send_body(&mut s.pipe.client, 0, b"", true),
Err(Error::Done)
);
assert_eq!(s.pipe.client.stream_writable_next(), Some(2));
assert_eq!(s.pipe.client.stream_writable_next(), Some(6));
assert_eq!(s.pipe.client.stream_writable_next(), Some(10));
assert_eq!(s.pipe.client.stream_writable_next(), None);
s.advance().ok();
assert_eq!(s.pipe.client.stream_writable_next(), Some(0));
assert_eq!(s.client.send_body(&mut s.pipe.client, 0, b"", true), Ok(0));
}
#[test]
fn empty_settings() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.set_ack_delay_exponent(8);
config.grease(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
assert!(s.client.peer_settings_raw().is_some());
assert!(s.server.peer_settings_raw().is_some());
}
#[test]
fn dgram_setting() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.enable_dgram(true, 1000, 1000);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
assert_eq!(s.pipe.handshake(), Ok(()));
s.client.send_settings(&mut s.pipe.client).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert!(!s.server.dgram_enabled_by_peer(&s.pipe.server));
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
assert!(s.server.dgram_enabled_by_peer(&s.pipe.server));
s.server.send_settings(&mut s.pipe.server).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert!(!s.client.dgram_enabled_by_peer(&s.pipe.client));
assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
assert!(s.client.dgram_enabled_by_peer(&s.pipe.client));
}
#[test]
fn dgram_setting_no_tp() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
assert_eq!(s.pipe.handshake(), Ok(()));
s.client.control_stream_id = Some(
s.client
.open_uni_stream(
&mut s.pipe.client,
stream::HTTP3_CONTROL_STREAM_TYPE_ID,
)
.unwrap(),
);
let settings = frame::Frame::Settings {
max_field_section_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
connect_protocol_enabled: None,
h3_datagram: Some(1),
grease: None,
additional_settings: Default::default(),
raw: Default::default(),
};
s.send_frame_client(settings, s.client.control_stream_id.unwrap(), false)
.unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
}
#[test]
fn settings_h2_prohibited() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
assert_eq!(s.pipe.handshake(), Ok(()));
s.client.control_stream_id = Some(
s.client
.open_uni_stream(
&mut s.pipe.client,
stream::HTTP3_CONTROL_STREAM_TYPE_ID,
)
.unwrap(),
);
s.server.control_stream_id = Some(
s.server
.open_uni_stream(
&mut s.pipe.server,
stream::HTTP3_CONTROL_STREAM_TYPE_ID,
)
.unwrap(),
);
let frame_payload_len = 2u64;
let settings = [
frame::SETTINGS_FRAME_TYPE_ID as u8,
frame_payload_len as u8,
0x2, 1,
];
s.send_arbitrary_stream_data_client(
&settings,
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
s.send_arbitrary_stream_data_server(
&settings,
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::SettingsError));
assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::SettingsError));
}
#[test]
fn set_prohibited_additional_settings() {
let mut h3_config = Config::new().unwrap();
assert_eq!(
h3_config.set_additional_settings(vec![(
frame::SETTINGS_QPACK_MAX_TABLE_CAPACITY,
43
)]),
Err(Error::SettingsError)
);
assert_eq!(
h3_config.set_additional_settings(vec![(
frame::SETTINGS_MAX_FIELD_SECTION_SIZE,
43
)]),
Err(Error::SettingsError)
);
assert_eq!(
h3_config.set_additional_settings(vec![(
frame::SETTINGS_QPACK_BLOCKED_STREAMS,
43
)]),
Err(Error::SettingsError)
);
assert_eq!(
h3_config.set_additional_settings(vec![(
frame::SETTINGS_ENABLE_CONNECT_PROTOCOL,
43
)]),
Err(Error::SettingsError)
);
assert_eq!(
h3_config
.set_additional_settings(vec![(frame::SETTINGS_H3_DATAGRAM, 43)]),
Err(Error::SettingsError)
);
}
#[test]
fn set_additional_settings() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(70);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.grease(false);
let mut h3_config = Config::new().unwrap();
h3_config
.set_additional_settings(vec![(42, 43), (44, 45)])
.unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
assert_eq!(s.pipe.handshake(), Ok(()));
assert_eq!(s.pipe.advance(), Ok(()));
s.client.send_settings(&mut s.pipe.client).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::Done));
s.server.send_settings(&mut s.pipe.server).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.client.poll(&mut s.pipe.client), Err(Error::Done));
assert_eq!(
s.server.peer_settings_raw(),
Some(&[(42, 43), (44, 45)][..])
);
assert_eq!(
s.client.peer_settings_raw(),
Some(&[(42, 43), (44, 45)][..])
);
}
#[test]
fn single_dgram() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let result = (11, 0, 1);
s.send_dgram_client(0).unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
s.send_dgram_server(0).unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
}
#[test]
fn multiple_dgram() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let result = (11, 0, 1);
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_client(&mut buf), Err(Error::Done));
}
#[test]
fn multiple_dgram_overflow() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let result = (11, 0, 1);
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.recv_dgram_server(&mut buf), Err(Error::Done));
}
#[test]
fn poll_datagram_cycling_no_read() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.enable_dgram(true, 100, 100);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_body_client(stream, true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
s.send_dgram_client(0).unwrap();
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn poll_datagram_single_read() {
let mut buf = [0; 65535];
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.enable_dgram(true, 100, 100);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let result = (11, 0, 1);
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
s.send_dgram_client(0).unwrap();
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, false).unwrap();
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
s.send_dgram_server(0).unwrap();
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn poll_datagram_multi_read() {
let mut buf = [0; 65535];
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.enable_dgram(true, 100, 100);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let flow_0_result = (11, 0, 1);
let flow_2_result = (11, 2, 1);
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(2).unwrap();
s.send_dgram_client(2).unwrap();
s.send_dgram_client(2).unwrap();
s.send_dgram_client(2).unwrap();
s.send_dgram_client(2).unwrap();
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, false).unwrap();
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
s.send_dgram_server(0).unwrap();
s.send_dgram_server(2).unwrap();
s.send_dgram_server(2).unwrap();
s.send_dgram_server(2).unwrap();
s.send_dgram_server(2).unwrap();
s.send_dgram_server(2).unwrap();
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.recv_dgram_client(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn finished_is_for_requests() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.client.open_grease_stream(&mut s.pipe.client), Ok(()));
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn finished_once() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Err(Error::Done));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn data_event_rearm() {
let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (r1_id, r1_hdrs) = s.send_request(false).unwrap();
let mut recv_buf = vec![0; bytes.len()];
let r1_ev_headers = Event::Headers {
list: r1_hdrs,
more_frames: true,
};
{
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
b.put_varint(bytes.len() as u64).unwrap();
let off = b.off();
s.pipe.client.stream_send(r1_id, &d[..off], false).unwrap();
assert_eq!(
s.pipe.client.stream_send(r1_id, &bytes[..5], false),
Ok(5)
);
s.advance().ok();
}
assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_headers)));
assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
assert_eq!(s.pipe.client.stream_send(r1_id, &bytes[5..], false), Ok(5));
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(5));
assert_eq!(s.poll_server(), Err(Error::Done));
let r1_body = s.send_body_client(r1_id, false).unwrap();
assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
let (r2_id, r2_hdrs) = s.send_request(false).unwrap();
let r2_ev_headers = Event::Headers {
list: r2_hdrs,
more_frames: true,
};
let r2_body = s.send_body_client(r2_id, false).unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_headers)));
assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
assert_eq!(s.poll_server(), Err(Error::Done));
let r1_body = s.send_body_client(r1_id, false).unwrap();
let trailers = vec![Header::new(b"hello", b"world")];
s.client
.send_headers(&mut s.pipe.client, r1_id, &trailers, true)
.unwrap();
let r1_ev_trailers = Event::Headers {
list: trailers.clone(),
more_frames: false,
};
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r1_id, Event::Data)));
assert_eq!(s.recv_body_server(r1_id, &mut recv_buf), Ok(r1_body.len()));
assert_eq!(s.poll_server(), Ok((r1_id, r1_ev_trailers)));
assert_eq!(s.poll_server(), Ok((r1_id, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
let r2_body = s.send_body_client(r2_id, false).unwrap();
s.client
.send_headers(&mut s.pipe.client, r2_id, &trailers, false)
.unwrap();
let r2_ev_trailers = Event::Headers {
list: trailers,
more_frames: true,
};
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r2_id, Event::Data)));
assert_eq!(s.recv_body_server(r2_id, &mut recv_buf), Ok(r2_body.len()));
assert_eq!(s.poll_server(), Ok((r2_id, r2_ev_trailers)));
assert_eq!(s.poll_server(), Err(Error::Done));
let (r3_id, r3_hdrs) = s.send_request(false).unwrap();
let r3_ev_headers = Event::Headers {
list: r3_hdrs,
more_frames: true,
};
{
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
b.put_varint(bytes.len() as u64).unwrap();
let off = b.off();
s.pipe.client.stream_send(r3_id, &d[..off], false).unwrap();
s.advance().ok();
}
assert_eq!(s.poll_server(), Ok((r3_id, r3_ev_headers)));
assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Err(Error::Done));
assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[..5], false), Ok(5));
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
assert_eq!(s.pipe.client.stream_send(r3_id, &bytes[5..], false), Ok(5));
s.advance().ok();
assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(5));
let body = s.send_body_client(r3_id, false).unwrap();
s.send_body_client(r3_id, false).unwrap();
s.send_body_client(r3_id, false).unwrap();
assert_eq!(s.poll_server(), Ok((r3_id, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
{
let mut d = [42; 10];
let mut b = octets::OctetsMut::with_slice(&mut d);
b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
b.put_varint(0).unwrap();
let off = b.off();
s.pipe.client.stream_send(r3_id, &d[..off], true).unwrap();
s.advance().ok();
}
let mut recv_buf = vec![0; bytes.len() * 3];
assert_eq!(s.recv_body_server(r3_id, &mut recv_buf), Ok(body.len() * 3));
}
#[test]
fn dgram_event_rearm() {
let mut buf = [0; 65535];
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(&[b"h3"]).unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
config.enable_dgram(true, 100, 100);
let h3_config = Config::new().unwrap();
let mut s = Session::with_configs(&mut config, &h3_config).unwrap();
s.handshake().unwrap();
let flow_0_result = (11, 0, 1);
let flow_2_result = (11, 2, 1);
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
s.send_dgram_client(0).unwrap();
s.send_dgram_client(0).unwrap();
s.send_dgram_client(2).unwrap();
s.send_dgram_client(2).unwrap();
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
s.send_dgram_client(0).unwrap();
s.send_dgram_client(2).unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_0_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_dgram_server(&mut buf), Ok(flow_2_result));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.pipe.client.dgram_sent_count, 6);
assert_eq!(s.pipe.client.dgram_recv_count, 0);
assert_eq!(s.pipe.server.dgram_sent_count, 0);
assert_eq!(s.pipe.server.dgram_recv_count, 6);
let server_path = s.pipe.server.paths.get_active().expect("no active");
let client_path = s.pipe.client.paths.get_active().expect("no active");
assert_eq!(client_path.dgram_sent_count, 6);
assert_eq!(client_path.dgram_recv_count, 0);
assert_eq!(server_path.dgram_sent_count, 0);
assert_eq!(server_path.dgram_recv_count, 6);
}
#[test]
fn reset_stream() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
let frames = [crate::frame::Frame::ResetStream {
stream_id: stream,
error_code: 42,
final_size: 68,
}];
let pkt_type = crate::packet::Type::Short;
assert_eq!(
s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Ok(39)
);
assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(
s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Ok(39)
);
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn client_shutdown_write_server_fin() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, true).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Write, 42),
Ok(())
);
assert_eq!(s.advance(), Ok(()));
assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
assert_eq!(s.poll_server(), Err(Error::Done));
assert!(s.pipe.server.streams.is_collected(stream));
assert!(s.pipe.client.streams.is_collected(stream));
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, false).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Write, 42),
Ok(())
);
assert_eq!(s.advance(), Ok(()));
assert_eq!(s.poll_server(), Ok((stream, Event::Reset(42))));
assert_eq!(s.poll_server(), Err(Error::Done));
s.send_body_server(stream, true).unwrap();
assert!(s.pipe.server.streams.is_collected(stream));
assert!(!s.pipe.client.streams.is_collected(stream));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
s.recv_body_client(stream, &mut buf).unwrap();
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert!(s.pipe.client.streams.is_collected(stream));
}
#[test]
fn client_shutdown_read() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, false).unwrap();
let ev_headers = Event::Headers {
list: resp,
more_frames: true,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Err(Error::Done));
assert_eq!(
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Read, 42),
Ok(())
);
assert_eq!(s.advance(), Ok(()));
assert_eq!(s.poll_server(), Err(Error::Done));
let writables: Vec<u64> = s.pipe.server.writable().collect();
assert!(writables.contains(&stream));
assert_eq!(
s.send_body_server(stream, false),
Err(Error::TransportError(crate::Error::StreamStopped(42)))
);
assert_eq!(
s.client.send_body(&mut s.pipe.client, stream, &[], true),
Ok(0)
);
assert_eq!(s.advance(), Ok(()));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut buf), Err(Error::Done));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
assert!(s.pipe.client.streams.is_collected(stream));
assert!(s.pipe.server.streams.is_collected(stream));
}
#[test]
fn reset_finished_at_server() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, _req) = s.send_request(false).unwrap();
assert_eq!(
s.pipe.client.stream_shutdown(0, crate::Shutdown::Write, 0),
Ok(())
);
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
assert_eq!(s.poll_server(), Err(Error::Done));
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(
s.pipe.client.stream_shutdown(4, crate::Shutdown::Write, 0),
Ok(())
);
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn reset_finished_at_server_with_data_pending() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
assert!(s.send_body_client(stream, false).is_ok());
assert_eq!(s.pipe.advance(), Ok(()));
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Write, 0),
Ok(())
);
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.poll_server(), Ok((stream, Event::Reset(0))));
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.pipe.server.readable().len(), 0);
}
#[test]
fn reset_finished_at_server_with_data_pending_2() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
assert!(s.send_body_client(stream, false).is_ok());
assert_eq!(s.pipe.advance(), Ok(()));
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.pipe
.client
.stream_shutdown(stream, crate::Shutdown::Write, 0),
Ok(())
);
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(
s.recv_body_server(stream, &mut [0; 100]),
Err(Error::TransportError(crate::Error::StreamReset(0)))
);
assert_eq!(s.poll_server(), Err(Error::Done));
assert_eq!(s.pipe.server.readable().len(), 0);
}
#[test]
fn reset_finished_at_client() {
let mut buf = [0; 65535];
let mut s = Session::new().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: true,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Err(Error::Done));
s.send_response(stream, false).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(
s.pipe
.server
.stream_shutdown(stream, crate::Shutdown::Write, 0),
Ok(())
);
assert_eq!(s.pipe.advance(), Ok(()));
assert_eq!(s.poll_client(), Ok((stream, Event::Reset(0))));
assert_eq!(s.poll_server(), Err(Error::Done));
let (stream, req) = s.send_request(true).unwrap();
let ev_headers = Event::Headers {
list: req,
more_frames: false,
};
assert_eq!(s.poll_server(), Ok((stream, ev_headers)));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_server(), Err(Error::Done));
let resp = s.send_response(stream, true).unwrap();
assert_eq!(s.pipe.advance(), Ok(()));
let frames = [crate::frame::Frame::ResetStream {
stream_id: stream,
error_code: 42,
final_size: 68,
}];
let pkt_type = crate::packet::Type::Short;
assert_eq!(
s.pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Ok(39)
);
assert_eq!(s.pipe.advance(), Ok(()));
let ev_headers = Event::Headers {
list: resp,
more_frames: false,
};
assert_eq!(s.poll_client(), Ok((stream, ev_headers)));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
}
#[cfg(feature = "ffi")]
mod ffi;
#[cfg(feature = "internal")]
#[doc(hidden)]
pub mod frame;
#[cfg(not(feature = "internal"))]
mod frame;
#[doc(hidden)]
pub mod qpack;
mod stream;