use std::collections::HashMap;
use std::collections::VecDeque;
use crate::octets;
pub const APPLICATION_PROTOCOL: &[u8] = b"\x05h3-23";
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Error {
Done,
BufferTooShort,
GeneralProtocolError,
InternalError,
RequestCancelled,
RequestIncomplete,
ConnectError,
ExcessiveLoad,
VersionFallback,
IdError,
StreamCreationError,
ClosedCriticalStream,
EarlyResponse,
MissingSettings,
FrameUnexpected,
RequestRejected,
SettingsError,
FrameError,
QpackDecompressionFailed,
QpackEncoderStreamError,
QpackDecoderStreamError,
TransportError(crate::Error),
}
impl Error {
fn to_wire(self) -> u64 {
match self {
Error::Done => 0x100,
Error::GeneralProtocolError => 0x101,
Error::InternalError => 0x102,
Error::StreamCreationError => 0x103,
Error::ClosedCriticalStream => 0x104,
Error::FrameUnexpected => 0x105,
Error::FrameError => 0x106,
Error::ExcessiveLoad => 0x107,
Error::IdError => 0x108,
Error::SettingsError => 0x109,
Error::MissingSettings => 0x10A,
Error::RequestRejected => 0x10B,
Error::RequestCancelled => 0x10C,
Error::RequestIncomplete => 0x10D,
Error::EarlyResponse => 0x10E,
Error::ConnectError => 0x10F,
Error::VersionFallback => 0x110,
Error::QpackDecompressionFailed => 0x200,
Error::QpackEncoderStreamError => 0x201,
Error::QpackDecoderStreamError => 0x202,
Error::BufferTooShort => 0x999,
Error::TransportError { .. } => 0xFF,
}
}
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
Error::BufferTooShort => -2,
Error::GeneralProtocolError => -3,
Error::InternalError => -5,
Error::RequestCancelled => -7,
Error::RequestIncomplete => -8,
Error::ConnectError => -9,
Error::ExcessiveLoad => -10,
Error::VersionFallback => -11,
Error::IdError => -13,
Error::StreamCreationError => -15,
Error::ClosedCriticalStream => -17,
Error::EarlyResponse => -19,
Error::MissingSettings => -20,
Error::FrameUnexpected => -21,
Error::RequestRejected => -22,
Error::SettingsError => -23,
Error::FrameError => -24,
Error::QpackDecompressionFailed => -25,
Error::QpackEncoderStreamError => -26,
Error::QpackDecoderStreamError => -27,
Error::TransportError { .. } => -28,
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl std::convert::From<super::Error> for Error {
fn from(err: super::Error) -> Self {
match err {
super::Error::Done => Error::Done,
_ => Error::TransportError(err),
}
}
}
impl std::convert::From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
pub struct Config {
max_header_list_size: Option<u64>,
qpack_max_table_capacity: Option<u64>,
qpack_blocked_streams: Option<u64>,
}
impl Config {
pub fn new() -> Result<Config> {
Ok(Config {
max_header_list_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
})
}
pub fn set_max_header_list_size(&mut self, v: u64) {
self.max_header_list_size = Some(v);
}
pub fn set_qpack_max_table_capacity(&mut self, v: u64) {
self.qpack_max_table_capacity = Some(v);
}
pub fn set_qpack_blocked_streams(&mut self, v: u64) {
self.qpack_blocked_streams = Some(v);
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Header(String, String);
impl Header {
pub fn new(name: &str, value: &str) -> Header {
Header(name.to_lowercase(), String::from(value))
}
pub fn name(&self) -> &str {
&self.0
}
pub fn value(&self) -> &str {
&self.1
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum Event {
Headers(Vec<Header>),
Data,
Finished,
}
struct ConnectionSettings {
pub max_header_list_size: Option<u64>,
pub qpack_max_table_capacity: Option<u64>,
pub qpack_blocked_streams: Option<u64>,
}
struct QpackStreams {
pub encoder_stream_id: Option<u64>,
pub decoder_stream_id: Option<u64>,
}
pub struct Connection {
is_server: bool,
highest_request_stream_id: u64,
highest_uni_stream_id: u64,
streams: HashMap<u64, stream::Stream>,
local_settings: ConnectionSettings,
peer_settings: ConnectionSettings,
control_stream_id: Option<u64>,
peer_control_stream_id: Option<u64>,
qpack_encoder: qpack::Encoder,
qpack_decoder: qpack::Decoder,
local_qpack_streams: QpackStreams,
peer_qpack_streams: QpackStreams,
max_push_id: u64,
finished_streams: VecDeque<u64>,
frames_greased: bool,
}
impl Connection {
fn new(config: &Config, is_server: bool) -> Result<Connection> {
let initial_uni_stream_id = if is_server { 0x3 } else { 0x2 };
Ok(Connection {
is_server,
highest_request_stream_id: 0,
highest_uni_stream_id: initial_uni_stream_id,
streams: HashMap::new(),
local_settings: ConnectionSettings {
max_header_list_size: config.max_header_list_size,
qpack_max_table_capacity: config.qpack_max_table_capacity,
qpack_blocked_streams: config.qpack_blocked_streams,
},
peer_settings: ConnectionSettings {
max_header_list_size: None,
qpack_max_table_capacity: None,
qpack_blocked_streams: None,
},
control_stream_id: None,
peer_control_stream_id: None,
qpack_encoder: qpack::Encoder::new(),
qpack_decoder: qpack::Decoder::new(),
local_qpack_streams: QpackStreams {
encoder_stream_id: None,
decoder_stream_id: None,
},
peer_qpack_streams: QpackStreams {
encoder_stream_id: None,
decoder_stream_id: None,
},
max_push_id: 0,
finished_streams: VecDeque::new(),
frames_greased: false,
})
}
pub fn with_transport(
conn: &mut super::Connection, config: &Config,
) -> Result<Connection> {
let mut http3_conn = Connection::new(config, conn.is_server)?;
http3_conn.send_settings(conn)?;
http3_conn.open_qpack_encoder_stream(conn).ok();
http3_conn.open_qpack_decoder_stream(conn).ok();
if conn.grease {
http3_conn.open_grease_stream(conn).ok();
}
Ok(http3_conn)
}
pub fn send_request(
&mut self, conn: &mut super::Connection, headers: &[Header], fin: bool,
) -> Result<u64> {
let stream_id = self.get_available_request_stream()?;
self.streams
.insert(stream_id, stream::Stream::new(stream_id, true));
self.send_headers(conn, stream_id, headers, fin)?;
Ok(stream_id)
}
pub fn send_response(
&mut self, conn: &mut super::Connection, stream_id: u64,
headers: &[Header], fin: bool,
) -> Result<()> {
self.send_headers(conn, stream_id, headers, fin)?;
Ok(())
}
fn encode_header_block(&mut self, headers: &[Header]) -> Result<Vec<u8>> {
let headers_len = headers
.iter()
.fold(0, |acc, h| acc + h.value().len() + h.name().len() + 32);
let mut header_block = vec![0; headers_len];
let len = self
.qpack_encoder
.encode(&headers, &mut header_block)
.map_err(|_| Error::InternalError)?;
header_block.truncate(len);
Ok(header_block)
}
fn send_headers(
&mut self, conn: &mut super::Connection, stream_id: u64,
headers: &[Header], fin: bool,
) -> Result<()> {
let mut d = [42; 10];
let mut b = octets::Octets::with_slice(&mut d);
let header_block = self.encode_header_block(headers)?;
if !self.frames_greased && conn.grease {
self.send_grease_frames(conn, stream_id)?;
self.frames_greased = true;
}
trace!(
"{} tx frm HEADERS stream={} len={} fin={}",
conn.trace_id(),
stream_id,
header_block.len(),
fin
);
conn.stream_send(
stream_id,
b.put_varint(frame::HEADERS_FRAME_TYPE_ID)?,
false,
)?;
conn.stream_send(
stream_id,
b.put_varint(header_block.len() as u64)?,
false,
)?;
conn.stream_send(stream_id, &header_block, fin)?;
Ok(())
}
pub fn send_body(
&mut self, conn: &mut super::Connection, stream_id: u64, body: &[u8],
fin: bool,
) -> Result<usize> {
let mut d = [42; 10];
let mut b = octets::Octets::with_slice(&mut d);
if !self.streams.contains_key(&stream_id) || stream_id % 4 != 0 {
return Err(Error::FrameUnexpected);
}
let overhead = octets::varint_len(frame::DATA_FRAME_TYPE_ID) +
octets::varint_len(body.len() as u64);
let stream_cap = conn.stream_capacity(stream_id)?;
if stream_cap <= overhead {
return Err(Error::Done);
}
let body_len = std::cmp::min(body.len(), stream_cap - overhead);
let fin = if body_len != body.len() { false } else { fin };
trace!(
"{} tx frm DATA stream={} len={} fin={}",
conn.trace_id(),
stream_id,
body_len,
fin
);
conn.stream_send(
stream_id,
b.put_varint(frame::DATA_FRAME_TYPE_ID)?,
false,
)?;
conn.stream_send(stream_id, b.put_varint(body_len as u64)?, false)?;
let written = conn.stream_send(stream_id, &body[..body_len], fin)?;
Ok(written)
}
pub fn recv_body(
&mut self, conn: &mut super::Connection, stream_id: u64, out: &mut [u8],
) -> Result<usize> {
let stream = self.streams.get_mut(&stream_id).ok_or(Error::Done)?;
if stream.state() != stream::State::Data {
return Err(Error::Done);
}
let read = stream.try_consume_data(conn, out)?;
if conn.stream_finished(stream_id) {
self.finished_streams.push_back(stream_id);
}
Ok(read)
}
pub fn poll(&mut self, conn: &mut super::Connection) -> Result<(u64, Event)> {
if let Some(stream_id) = self.peer_control_stream_id {
self.process_control_stream(conn, stream_id)?;
}
if let Some(stream_id) = self.peer_qpack_streams.encoder_stream_id {
self.process_control_stream(conn, stream_id)?;
}
if let Some(stream_id) = self.peer_qpack_streams.decoder_stream_id {
self.process_control_stream(conn, stream_id)?;
}
if let Some(finished) = self.finished_streams.pop_front() {
return Ok((finished, Event::Finished));
}
for s in conn.readable() {
trace!("{} stream id {} is readable", conn.trace_id(), s);
let ev = match self.process_readable_stream(conn, s) {
Ok(v) => Some(v),
Err(Error::Done) => None,
Err(e) => return Err(e),
};
if conn.stream_finished(s) {
self.finished_streams.push_back(s);
}
if let Some(ev) = ev {
return Ok(ev);
}
}
Err(Error::Done)
}
fn get_available_request_stream(&mut self) -> Result<u64> {
if self.highest_request_stream_id < std::u64::MAX {
let ret = self.highest_request_stream_id;
self.highest_request_stream_id += 4;
return Ok(ret);
}
Err(Error::IdError)
}
fn get_available_uni_stream(&mut self) -> Result<u64> {
if self.highest_uni_stream_id < std::u64::MAX {
let ret = self.highest_uni_stream_id;
self.highest_uni_stream_id += 4;
return Ok(ret);
}
Err(Error::IdError)
}
fn open_uni_stream(
&mut self, conn: &mut super::Connection, ty: u64,
) -> Result<u64> {
let stream_id = self.get_available_uni_stream()?;
let mut d = [0; 8];
let mut b = octets::Octets::with_slice(&mut d);
conn.stream_send(stream_id, b.put_varint(ty)?, false)?;
Ok(stream_id)
}
fn open_qpack_encoder_stream(
&mut self, conn: &mut super::Connection,
) -> Result<()> {
self.local_qpack_streams.encoder_stream_id = Some(
self.open_uni_stream(conn, stream::QPACK_ENCODER_STREAM_TYPE_ID)?,
);
Ok(())
}
fn open_qpack_decoder_stream(
&mut self, conn: &mut super::Connection,
) -> Result<()> {
self.local_qpack_streams.decoder_stream_id = Some(
self.open_uni_stream(conn, stream::QPACK_DECODER_STREAM_TYPE_ID)?,
);
Ok(())
}
fn send_grease_frames(
&mut self, conn: &mut super::Connection, stream_id: u64,
) -> Result<()> {
let mut d = [42; 128];
let mut b = octets::Octets::with_slice(&mut d);
trace!("{} tx frm GREASE stream={}", conn.trace_id(), stream_id);
conn.stream_send(stream_id, b.put_varint(grease_value())?, false)?;
conn.stream_send(stream_id, b.put_varint(0)?, false)?;
conn.stream_send(stream_id, b.put_varint(grease_value())?, false)?;
conn.stream_send(stream_id, b.put_varint(18)?, false)?;
conn.stream_send(stream_id, b"GREASE is the word", false)?;
Ok(())
}
fn open_grease_stream(&mut self, conn: &mut super::Connection) -> Result<()> {
match self.open_uni_stream(conn, grease_value()) {
Ok(stream_id) => {
trace!("{} open GREASE stream {}", conn.trace_id(), stream_id);
conn.stream_send(stream_id, b"GREASE is the word", false)?;
},
Err(Error::IdError) => {
trace!("{} GREASE stream blocked", conn.trace_id(),);
return Ok(());
},
Err(e) => return Err(e),
};
Ok(())
}
fn send_settings(&mut self, conn: &mut super::Connection) -> Result<()> {
self.control_stream_id = Some(
self.open_uni_stream(conn, stream::HTTP3_CONTROL_STREAM_TYPE_ID)?,
);
let grease = if conn.grease {
Some((grease_value(), grease_value()))
} else {
None
};
let frame = frame::Frame::Settings {
max_header_list_size: self.local_settings.max_header_list_size,
qpack_max_table_capacity: self
.local_settings
.qpack_max_table_capacity,
qpack_blocked_streams: self.local_settings.qpack_blocked_streams,
grease,
};
let mut d = [42; 128];
let mut b = octets::Octets::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
if let Some(id) = self.control_stream_id {
conn.stream_send(id, &d[..off], false)?;
}
Ok(())
}
fn process_control_stream(
&mut self, conn: &mut super::Connection, stream_id: u64,
) -> Result<()> {
match self.process_readable_stream(conn, stream_id) {
Ok(_) => (),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
if conn.stream_finished(stream_id) {
conn.close(
true,
Error::ClosedCriticalStream.to_wire(),
b"Critical stream closed.",
)?;
return Err(Error::ClosedCriticalStream);
}
Ok(())
}
fn process_readable_stream(
&mut self, conn: &mut super::Connection, stream_id: u64,
) -> Result<(u64, Event)> {
self.streams
.entry(stream_id)
.or_insert_with(|| stream::Stream::new(stream_id, false));
while let Some(stream) = self.streams.get_mut(&stream_id) {
match stream.state() {
stream::State::StreamType => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
let ty = stream::Type::deserialize(varint)?;
if let Err(e) = stream.set_ty(ty) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
match &ty {
stream::Type::Control => {
if self.peer_control_stream_id.is_some() {
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple control streams",
)?;
return Err(Error::StreamCreationError);
}
trace!(
"{} open peer's control stream {}",
conn.trace_id(),
stream_id
);
self.peer_control_stream_id = Some(stream_id);
},
stream::Type::Push => {
if self.is_server {
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Server received push stream.",
)?;
return Err(Error::StreamCreationError);
}
},
stream::Type::QpackEncoder => {
if self.peer_qpack_streams.encoder_stream_id.is_some()
{
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple QPACK encoder streams",
)?;
return Err(Error::StreamCreationError);
}
self.peer_qpack_streams.encoder_stream_id =
Some(stream_id);
},
stream::Type::QpackDecoder => {
if self.peer_qpack_streams.decoder_stream_id.is_some()
{
conn.close(
true,
Error::StreamCreationError.to_wire(),
b"Received multiple QPACK decoder streams",
)?;
return Err(Error::StreamCreationError);
}
self.peer_qpack_streams.decoder_stream_id =
Some(stream_id);
},
stream::Type::Unknown => {
},
stream::Type::Request => unreachable!(),
}
},
stream::State::PushId => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
if let Err(e) = stream.set_push_id(varint) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
},
stream::State::FrameType => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
match stream.set_frame_type(varint) {
Err(Error::FrameUnexpected) => {
let msg = format!("Unexpected frame type {}", varint);
conn.close(
true,
Error::FrameUnexpected.to_wire(),
msg.as_bytes(),
)?;
return Err(Error::FrameUnexpected);
},
Err(e) => {
conn.close(
true,
e.to_wire(),
b"Error handling frame.",
)?;
return Err(e);
},
_ => (),
}
},
stream::State::FramePayloadLen => {
stream.try_fill_buffer(conn)?;
let varint = match stream.try_consume_varint() {
Ok(v) => v,
Err(_) => continue,
};
if let Err(e) = stream.set_frame_payload_len(varint) {
conn.close(true, e.to_wire(), b"")?;
return Err(e);
}
},
stream::State::FramePayload => {
stream.try_fill_buffer(conn)?;
let frame = match stream.try_consume_frame() {
Ok(frame) => frame,
Err(Error::Done) => return Err(Error::Done),
Err(e) => {
conn.close(
true,
e.to_wire(),
b"Error handling frame.",
)?;
return Err(e);
},
};
match self.process_frame(conn, stream_id, frame) {
Ok(ev) => return Ok(ev),
Err(Error::Done) => (),
Err(e) => return Err(e),
};
},
stream::State::Data => {
return Ok((stream_id, Event::Data));
},
stream::State::QpackInstruction => {
let mut d = [0; 4096];
loop {
conn.stream_recv(stream_id, &mut d)?;
}
},
stream::State::Drain => {
conn.stream_shutdown(stream_id, crate::Shutdown::Read, 0)?;
break;
},
}
}
Err(Error::Done)
}
fn process_frame(
&mut self, conn: &mut super::Connection, stream_id: u64,
frame: frame::Frame,
) -> Result<(u64, Event)> {
trace!(
"{} rx frm {:?} stream={}",
conn.trace_id(),
frame,
stream_id
);
match frame {
frame::Frame::Settings {
max_header_list_size,
qpack_max_table_capacity,
qpack_blocked_streams,
..
} => {
self.peer_settings = ConnectionSettings {
max_header_list_size,
qpack_max_table_capacity,
qpack_blocked_streams,
};
},
frame::Frame::Headers { mut header_block } => {
if Some(stream_id) == self.peer_control_stream_id {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"HEADERS received on control stream",
)?;
return Err(Error::FrameUnexpected);
}
let max_size = self
.local_settings
.max_header_list_size
.unwrap_or(std::u64::MAX);
let headers = self
.qpack_decoder
.decode(&mut header_block[..], max_size)
.map_err(|e| match e {
qpack::Error::HeaderListTooLarge => Error::ExcessiveLoad,
_ => Error::QpackDecompressionFailed,
})?;
return Ok((stream_id, Event::Headers(headers)));
},
frame::Frame::Data { .. } => {
if Some(stream_id) == self.peer_control_stream_id {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"DATA received on control stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::GoAway {
stream_id: goaway_stream_id,
} => {
if self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"GOWAY received on server",
)?;
return Err(Error::FrameUnexpected);
}
if Some(stream_id) != self.peer_control_stream_id {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"GOAWAY received on non-control stream",
)?;
return Err(Error::FrameUnexpected);
}
if goaway_stream_id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"GOAWAY received with ID of non-request stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::MaxPushId { push_id } => {
if Some(stream_id) != self.peer_control_stream_id {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"MAX_PUSH_ID received on non-control stream",
)?;
return Err(Error::FrameUnexpected);
}
if !self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"MAX_PUSH_ID received by client",
)?;
return Err(Error::FrameUnexpected);
}
if push_id < self.max_push_id {
conn.close(
true,
Error::IdError.to_wire(),
b"MAX_PUSH_ID reduced limit",
)?;
return Err(Error::IdError);
}
self.max_push_id = push_id;
},
frame::Frame::PushPromise { .. } => {
if self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PUSH_PROMISE received by server",
)?;
return Err(Error::FrameUnexpected);
}
if stream_id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"PUSH_PROMISE received on non-request stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::DuplicatePush { .. } => {
if self.is_server {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"DUPLICATE_PUSH received by server",
)?;
return Err(Error::FrameUnexpected);
}
if stream_id % 4 != 0 {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"DUPLICATE_PUSH received on non-request stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::CancelPush { .. } => {
if Some(stream_id) != self.peer_control_stream_id {
conn.close(
true,
Error::FrameUnexpected.to_wire(),
b"CANCEL_PUSH received on non-control stream",
)?;
return Err(Error::FrameUnexpected);
}
},
frame::Frame::Unknown => (),
}
Err(Error::Done)
}
}
fn grease_value() -> u64 {
let n = std::cmp::min(super::rand::rand_u64(), 148_764_065_110_560_899);
31 * n + 33
}
#[doc(hidden)]
pub mod testing {
use super::*;
use crate::testing;
pub struct Session {
pub pipe: testing::Pipe,
pub client: Connection,
pub server: Connection,
buf: [u8; 65535],
}
impl Session {
pub fn default() -> Result<Session> {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file("examples/cert.crt")?;
config.load_priv_key_from_pem_file("examples/cert.key")?;
config.set_application_protos(b"\x02h3")?;
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let h3_config = Config::new()?;
Session::with_configs(&mut config, &h3_config)
}
pub fn with_configs(
config: &mut crate::Config, h3_config: &Config,
) -> Result<Session> {
Ok(Session {
pipe: testing::Pipe::with_config(config)?,
client: Connection::new(&h3_config, false)?,
server: Connection::new(&h3_config, true)?,
buf: [0; 65535],
})
}
pub fn handshake(&mut self) -> Result<()> {
self.pipe.handshake(&mut self.buf)?;
self.client.send_settings(&mut self.pipe.client)?;
self.pipe.advance(&mut self.buf).ok();
self.client
.open_qpack_encoder_stream(&mut self.pipe.client)?;
self.pipe.advance(&mut self.buf).ok();
self.client
.open_qpack_decoder_stream(&mut self.pipe.client)?;
self.pipe.advance(&mut self.buf).ok();
if self.pipe.client.grease {
self.client.open_grease_stream(&mut self.pipe.client)?;
}
self.pipe.advance(&mut self.buf).ok();
self.server.send_settings(&mut self.pipe.server)?;
self.pipe.advance(&mut self.buf).ok();
self.server
.open_qpack_encoder_stream(&mut self.pipe.server)?;
self.pipe.advance(&mut self.buf).ok();
self.server
.open_qpack_decoder_stream(&mut self.pipe.server)?;
self.pipe.advance(&mut self.buf).ok();
if self.pipe.server.grease {
self.server.open_grease_stream(&mut self.pipe.server)?;
}
self.advance().ok();
while self.client.poll(&mut self.pipe.client).is_ok() {
}
while self.server.poll(&mut self.pipe.server).is_ok() {
}
Ok(())
}
pub fn advance(&mut self) -> crate::Result<()> {
self.pipe.advance(&mut self.buf)
}
pub fn poll_client(&mut self) -> Result<(u64, Event)> {
self.client.poll(&mut self.pipe.client)
}
pub fn poll_server(&mut self) -> Result<(u64, Event)> {
self.server.poll(&mut self.pipe.server)
}
pub fn send_request(&mut self, fin: bool) -> Result<(u64, Vec<Header>)> {
let req = vec![
Header::new(":method", "GET"),
Header::new(":scheme", "https"),
Header::new(":authority", "quic.tech"),
Header::new(":path", "/test"),
Header::new("user-agent", "quiche-test"),
];
let stream =
self.client.send_request(&mut self.pipe.client, &req, fin)?;
self.advance().ok();
Ok((stream, req))
}
pub fn send_response(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<Header>> {
let resp = vec![
Header::new(":status", "200"),
Header::new("server", "quiche-test"),
];
self.server.send_response(
&mut self.pipe.server,
stream,
&resp,
fin,
)?;
self.advance().ok();
Ok(resp)
}
pub fn send_body_client(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
self.client
.send_body(&mut self.pipe.client, stream, &bytes, fin)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_body_client(
&mut self, stream: u64, buf: &mut [u8],
) -> Result<usize> {
self.client.recv_body(&mut self.pipe.client, stream, buf)
}
pub fn send_body_server(
&mut self, stream: u64, fin: bool,
) -> Result<Vec<u8>> {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
self.server
.send_body(&mut self.pipe.server, stream, &bytes, fin)?;
self.advance().ok();
Ok(bytes)
}
pub fn recv_body_server(
&mut self, stream: u64, buf: &mut [u8],
) -> Result<usize> {
self.server.recv_body(&mut self.pipe.server, stream, buf)
}
pub fn send_frame_client(
&mut self, frame: frame::Frame, stream_id: u64, fin: bool,
) -> Result<()> {
let mut d = [42; 65535];
let mut b = octets::Octets::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
self.pipe.client.stream_send(stream_id, &d[..off], fin)?;
self.advance().ok();
Ok(())
}
pub fn send_frame_server(
&mut self, frame: frame::Frame, stream_id: u64, fin: bool,
) -> Result<()> {
let mut d = [42; 65535];
let mut b = octets::Octets::with_slice(&mut d);
frame.to_bytes(&mut b)?;
let off = b.off();
self.pipe.server.stream_send(stream_id, &d[..off], fin)?;
self.advance().ok();
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::testing::*;
#[test]
fn grease_value_in_varint_limit() {
assert!(grease_value() < 2u64.pow(62) - 1);
}
#[test]
fn request_no_body_response_no_body() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
assert_eq!(s.poll_client(), Ok((stream, Event::Headers(resp))));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_one_chunk() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(stream, 0);
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, false).unwrap();
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
assert_eq!(s.poll_client(), Ok((stream, Event::Headers(resp))));
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_no_body_response_many_chunks() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(true).unwrap();
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let total_data_frames = 4;
let resp = s.send_response(stream, false).unwrap();
for _ in 0..total_data_frames - 1 {
s.send_body_server(stream, false).unwrap();
}
let body = s.send_body_server(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
assert_eq!(s.poll_client(), Ok((stream, Event::Headers(resp))));
for _ in 0..total_data_frames {
assert_eq!(s.poll_client(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_client(stream, &mut recv_buf), Ok(body.len()));
}
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn request_one_chunk_response_no_body() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
assert_eq!(s.poll_client(), Ok((stream, Event::Headers(resp))));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
#[test]
fn request_many_chunks_response_no_body() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let total_data_frames = 4;
for _ in 0..total_data_frames - 1 {
s.send_body_client(stream, false).unwrap();
}
let body = s.send_body_client(stream, true).unwrap();
let mut recv_buf = vec![0; body.len()];
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
for _ in 0..total_data_frames {
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
}
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
let resp = s.send_response(stream, true).unwrap();
assert_eq!(s.poll_client(), Ok((stream, Event::Headers(resp))));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
#[test]
fn many_requests_many_chunks_response_one_chunk() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let mut reqs = Vec::new();
let (stream1, req1) = s.send_request(false).unwrap();
assert_eq!(stream1, 0);
reqs.push(req1);
let (stream2, req2) = s.send_request(false).unwrap();
assert_eq!(stream2, 4);
reqs.push(req2);
let (stream3, req3) = s.send_request(false).unwrap();
assert_eq!(stream3, 8);
reqs.push(req3);
let body = s.send_body_client(stream1, false).unwrap();
s.send_body_client(stream2, false).unwrap();
s.send_body_client(stream3, false).unwrap();
let mut recv_buf = vec![0; body.len()];
s.send_body_client(stream3, true).unwrap();
s.send_body_client(stream2, true).unwrap();
s.send_body_client(stream1, true).unwrap();
for _ in 0..reqs.len() {
let (stream, ev) = s.poll_server().unwrap();
assert_eq!(ev, Event::Headers(reqs[(stream / 4) as usize].clone()));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(s.recv_body_server(stream, &mut recv_buf), Ok(body.len()));
assert_eq!(s.poll_server(), Ok((stream, Event::Finished)));
}
assert_eq!(s.poll_server(), Err(Error::Done));
let mut resps = Vec::new();
let resp1 = s.send_response(stream1, true).unwrap();
resps.push(resp1);
let resp2 = s.send_response(stream2, true).unwrap();
resps.push(resp2);
let resp3 = s.send_response(stream3, true).unwrap();
resps.push(resp3);
for _ in 0..resps.len() {
let (stream, ev) = s.poll_client().unwrap();
assert_eq!(ev, Event::Headers(resps[(stream / 4) as usize].clone()));
assert_eq!(s.poll_client(), Ok((stream, Event::Finished)));
}
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn send_body_invalid_server_stream() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
assert_eq!(s.send_body_server(0, true), Err(Error::FrameUnexpected));
assert_eq!(
s.send_body_server(s.server.control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.local_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.local_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(s.server.peer_control_stream_id.unwrap(), true),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.peer_qpack_streams.encoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
assert_eq!(
s.send_body_server(
s.server.peer_qpack_streams.decoder_stream_id.unwrap(),
true
),
Err(Error::FrameUnexpected)
);
}
#[test]
fn max_push_id_from_client_good() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn max_push_id_from_client_bad_stream() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 2 },
stream,
false,
)
.unwrap();
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn max_push_id_from_client_limit_reduction() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 2 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::IdError));
}
#[test]
fn max_push_id_from_server() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::MaxPushId { push_id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
}
#[test]
fn push_promise_from_client() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let header_block = s.client.encode_header_block(&req).unwrap();
s.send_frame_client(
frame::Frame::PushPromise {
push_id: 1,
header_block,
},
stream,
false,
)
.unwrap();
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn cancel_push_from_client() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::CancelPush { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn cancel_push_from_client_bad_stream() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_frame_client(
frame::Frame::CancelPush { push_id: 2 },
stream,
false,
)
.unwrap();
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn cancel_push_from_server() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::CancelPush { push_id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn duplicate_push_from_client() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
s.send_frame_client(
frame::Frame::DuplicatePush { push_id: 1 },
stream,
false,
)
.unwrap();
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn goaway_from_client() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_client(
frame::Frame::GoAway { stream_id: 100 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::FrameUnexpected));
}
#[test]
fn goaway_from_server_good() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::GoAway { stream_id: 100 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::Done));
}
#[test]
fn goaway_from_server_bad_id() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
s.send_frame_server(
frame::Frame::GoAway { stream_id: 1 },
s.server.control_stream_id.unwrap(),
false,
)
.unwrap();
assert_eq!(s.poll_client(), Err(Error::FrameUnexpected));
}
#[test]
fn uni_stream_local_counting() {
let config = Config::new().unwrap();
let mut h3_cln = Connection::new(&config, false).unwrap();
assert_eq!(h3_cln.get_available_uni_stream().unwrap(), 2);
assert_eq!(h3_cln.get_available_uni_stream().unwrap(), 6);
assert_eq!(h3_cln.get_available_uni_stream().unwrap(), 10);
assert_eq!(h3_cln.get_available_uni_stream().unwrap(), 14);
assert_eq!(h3_cln.get_available_uni_stream().unwrap(), 18);
let mut h3_srv = Connection::new(&config, true).unwrap();
assert_eq!(h3_srv.get_available_uni_stream().unwrap(), 3);
assert_eq!(h3_srv.get_available_uni_stream().unwrap(), 7);
assert_eq!(h3_srv.get_available_uni_stream().unwrap(), 11);
assert_eq!(h3_srv.get_available_uni_stream().unwrap(), 15);
assert_eq!(h3_srv.get_available_uni_stream().unwrap(), 19);
}
#[test]
fn open_multiple_control_streams() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let stream_id = s.client.get_available_uni_stream().unwrap();
let mut d = [42; 8];
let mut b = octets::Octets::with_slice(&mut d);
s.pipe
.client
.stream_send(
stream_id,
b.put_varint(stream::HTTP3_CONTROL_STREAM_TYPE_ID).unwrap(),
false,
)
.unwrap();
s.advance().ok();
assert_eq!(s.poll_server(), Err(Error::StreamCreationError));
}
#[test]
fn close_control_stream() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let mut control_stream_closed = false;
s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
true,
)
.unwrap();
loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),
Err(Error::Done) => {
break;
},
Err(Error::ClosedCriticalStream) => {
control_stream_closed = true;
break;
},
Err(_) => (),
}
}
assert!(control_stream_closed);
}
#[test]
fn close_qpack_stream() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let mut qpack_stream_closed = false;
let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d = [0; 1];
s.pipe.client.stream_send(stream_id, &d, false).unwrap();
s.pipe.client.stream_send(stream_id, &d, true).unwrap();
s.advance().ok();
loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),
Err(Error::Done) => {
break;
},
Err(Error::ClosedCriticalStream) => {
qpack_stream_closed = true;
break;
},
Err(_) => (),
}
}
assert!(qpack_stream_closed);
}
#[test]
fn qpack_data() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let e_stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d_stream_id = s.client.local_qpack_streams.decoder_stream_id.unwrap();
let d = [0; 20];
s.pipe.client.stream_send(e_stream_id, &d, false).unwrap();
s.pipe.client.stream_send(d_stream_id, &d, false).unwrap();
s.advance().ok();
loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),
Err(Error::Done) => {
break;
},
Err(_) => {
panic!();
},
}
}
}
#[test]
fn max_state_buf_size() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let mut d = [42; 128];
let mut b = octets::Octets::with_slice(&mut d);
let frame_type = b.put_varint(frame::DATA_FRAME_TYPE_ID).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(1 << 24).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, &d, false).unwrap();
s.advance().ok();
assert_eq!(s.server.poll(&mut s.pipe.server), Ok((0, Event::Data)));
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let mut d = [42; 128];
let mut b = octets::Octets::with_slice(&mut d);
let frame_type = b.put_varint(148_764_065_110_560_899).unwrap();
s.pipe.client.stream_send(0, frame_type, false).unwrap();
let frame_len = b.put_varint(1 << 24).unwrap();
s.pipe.client.stream_send(0, frame_len, false).unwrap();
s.pipe.client.stream_send(0, &d, false).unwrap();
s.advance().ok();
assert_eq!(s.server.poll(&mut s.pipe.server), Err(Error::InternalError));
}
#[test]
fn stream_backpressure() {
let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut s = Session::default().unwrap();
s.handshake().unwrap();
let (stream, req) = s.send_request(false).unwrap();
let total_data_frames = 6;
for _ in 0..total_data_frames {
assert_eq!(
s.client
.send_body(&mut s.pipe.client, stream, &bytes, false),
Ok(bytes.len())
);
s.advance().ok();
}
assert_eq!(
s.client.send_body(&mut s.pipe.client, stream, &bytes, true),
Ok(bytes.len() - 2)
);
s.advance().ok();
let mut recv_buf = vec![0; bytes.len()];
assert_eq!(s.poll_server(), Ok((stream, Event::Headers(req))));
for _ in 0..total_data_frames {
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len())
);
}
assert_eq!(s.poll_server(), Ok((stream, Event::Data)));
assert_eq!(
s.recv_body_server(stream, &mut recv_buf),
Ok(bytes.len() - 2)
);
assert_eq!(s.poll_server(), Err(Error::Done));
}
#[test]
fn request_max_header_size_limit() {
let mut config = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config.set_application_protos(b"\x02h3").unwrap();
config.set_initial_max_data(1500);
config.set_initial_max_stream_data_bidi_local(150);
config.set_initial_max_stream_data_bidi_remote(150);
config.set_initial_max_stream_data_uni(150);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(5);
config.verify_peer(false);
let mut h3_config = Config::new().unwrap();
h3_config.set_max_header_list_size(65);
let mut s = Session::with_configs(&mut config, &mut h3_config).unwrap();
s.handshake().unwrap();
let req = vec![
Header::new(":method", "GET"),
Header::new(":scheme", "https"),
Header::new(":authority", "quic.tech"),
Header::new(":path", "/test"),
Header::new("aaaaaaa", "aaaaaaaa"),
];
let stream = s
.client
.send_request(&mut s.pipe.client, &req, true)
.unwrap();
s.advance().ok();
assert_eq!(stream, 0);
assert_eq!(s.poll_server(), Err(Error::ExcessiveLoad));
}
#[test]
fn transport_error() {
let mut s = Session::default().unwrap();
s.handshake().unwrap();
assert!(s.send_request(true).is_ok());
assert!(s.send_request(true).is_ok());
assert!(s.send_request(true).is_ok());
assert!(s.send_request(true).is_ok());
assert!(s.send_request(true).is_ok());
assert_eq!(
s.send_request(true),
Err(Error::TransportError(crate::Error::StreamLimit))
);
}
}
mod ffi;
mod frame;
#[doc(hidden)]
pub mod qpack;
mod stream;