#![allow(improper_ctypes)]
#![allow(clippy::suspicious_operation_groupings)]
#![allow(clippy::upper_case_acronyms)]
#![warn(missing_docs)]
#[macro_use]
extern crate log;
use std::cmp;
use std::time;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Mutex;
pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_DRAFT29;
const PROTOCOL_VERSION_DRAFT27: u32 = 0xff00_001b;
const PROTOCOL_VERSION_DRAFT28: u32 = 0xff00_001c;
const PROTOCOL_VERSION_DRAFT29: u32 = 0xff00_001d;
pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
#[cfg(not(feature = "fuzzing"))]
const PAYLOAD_MIN_LEN: usize = 4;
#[cfg(feature = "fuzzing")]
const PAYLOAD_MIN_LEN: usize = 20;
const MAX_AMPLIFICATION_FACTOR: usize = 3;
const MAX_ACK_RANGES: usize = 68;
const MAX_STREAM_ID: u64 = 1 << 60;
const MAX_SEND_UDP_PAYLOAD_SIZE: usize = 1200;
const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
const PAYLOAD_LENGTH_LEN: usize = 2;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Error {
Done,
BufferTooShort,
UnknownVersion,
InvalidFrame,
InvalidPacket,
InvalidState,
InvalidStreamState,
InvalidTransportParam,
CryptoFail,
TlsFail,
FlowControl,
StreamLimit,
StreamStopped(u64),
FinalSize,
CongestionControl,
}
impl Error {
fn to_wire(self) -> u64 {
match self {
Error::Done => 0x0,
Error::InvalidFrame => 0x7,
Error::InvalidStreamState => 0x5,
Error::InvalidTransportParam => 0x8,
Error::FlowControl => 0x3,
Error::StreamLimit => 0x4,
Error::FinalSize => 0x6,
_ => 0xa,
}
}
#[cfg(feature = "ffi")]
fn to_c(self) -> libc::ssize_t {
match self {
Error::Done => -1,
Error::BufferTooShort => -2,
Error::UnknownVersion => -3,
Error::InvalidFrame => -4,
Error::InvalidPacket => -5,
Error::InvalidState => -6,
Error::InvalidStreamState => -7,
Error::InvalidTransportParam => -8,
Error::CryptoFail => -9,
Error::TlsFail => -10,
Error::FlowControl => -11,
Error::StreamLimit => -12,
Error::FinalSize => -13,
Error::CongestionControl => -14,
Error::StreamStopped { .. } => -15,
}
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl std::convert::From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct ConnectionError {
is_app: bool,
error_code: u64,
reason: Vec<u8>,
}
#[repr(C)]
pub enum Shutdown {
Read = 0,
Write = 1,
}
pub struct Config {
local_transport_params: TransportParams,
version: u32,
tls_ctx: Mutex<tls::Context>,
application_protos: Vec<Vec<u8>>,
grease: bool,
cc_algorithm: CongestionControlAlgorithm,
hystart: bool,
dgram_recv_max_queue_len: usize,
dgram_send_max_queue_len: usize,
max_send_udp_payload_size: usize,
}
impl Config {
pub fn new(version: u32) -> Result<Config> {
let tls_ctx = Mutex::new(tls::Context::new()?);
Ok(Config {
local_transport_params: TransportParams::default(),
version,
tls_ctx,
application_protos: Vec::new(),
grease: true,
cc_algorithm: CongestionControlAlgorithm::CUBIC,
hystart: true,
dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
max_send_udp_payload_size: MAX_SEND_UDP_PAYLOAD_SIZE,
})
}
pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.use_certificate_chain_file(file)
}
pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.lock().unwrap().use_privkey_file(file)
}
pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.load_verify_locations_from_file(file)
}
pub fn load_verify_locations_from_directory(
&mut self, dir: &str,
) -> Result<()> {
self.tls_ctx
.lock()
.unwrap()
.load_verify_locations_from_directory(dir)
}
pub fn verify_peer(&mut self, verify: bool) {
self.tls_ctx.lock().unwrap().set_verify(verify);
}
pub fn grease(&mut self, grease: bool) {
self.grease = grease;
}
pub fn log_keys(&mut self) {
self.tls_ctx.lock().unwrap().enable_keylog();
}
pub fn enable_early_data(&mut self) {
self.tls_ctx.lock().unwrap().set_early_data_enabled(true);
}
pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
let mut b = octets::Octets::with_slice(&protos);
let mut protos_list = Vec::new();
while let Ok(proto) = b.get_bytes_with_u8_length() {
protos_list.push(proto.to_vec());
}
self.application_protos = protos_list;
self.tls_ctx
.lock()
.unwrap()
.set_alpn(&self.application_protos)
}
pub fn set_max_idle_timeout(&mut self, v: u64) {
self.local_transport_params.max_idle_timeout = v;
}
pub fn set_max_recv_udp_payload_size(&mut self, v: usize) {
self.local_transport_params.max_udp_payload_size = v as u64;
}
pub fn set_max_send_udp_payload_size(&mut self, v: usize) {
self.max_send_udp_payload_size = cmp::max(v, MAX_SEND_UDP_PAYLOAD_SIZE);
}
pub fn set_initial_max_data(&mut self, v: u64) {
self.local_transport_params.initial_max_data = v;
}
pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_local = v;
}
pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_remote = v;
}
pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_stream_data_uni = v;
}
pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_bidi = v;
}
pub fn set_initial_max_streams_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_uni = v;
}
pub fn set_ack_delay_exponent(&mut self, v: u64) {
self.local_transport_params.ack_delay_exponent = v;
}
pub fn set_max_ack_delay(&mut self, v: u64) {
self.local_transport_params.max_ack_delay = v;
}
pub fn set_disable_active_migration(&mut self, v: bool) {
self.local_transport_params.disable_active_migration = v;
}
pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
Ok(())
}
pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
self.cc_algorithm = algo;
}
pub fn enable_hystart(&mut self, v: bool) {
self.hystart = v;
}
pub fn enable_dgram(
&mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
) {
self.local_transport_params.max_datagram_frame_size = if enabled {
Some(MAX_DGRAM_FRAME_SIZE)
} else {
None
};
self.dgram_recv_max_queue_len = recv_queue_len;
self.dgram_send_max_queue_len = send_queue_len;
}
}
pub struct Connection {
version: u32,
dcid: ConnectionId<'static>,
scid: ConnectionId<'static>,
trace_id: String,
pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
peer_transport_params: TransportParams,
local_transport_params: TransportParams,
handshake: Mutex<tls::Handshake>,
recovery: recovery::Recovery,
application_protos: Vec<Vec<u8>>,
recv_count: usize,
sent_count: usize,
rx_data: u64,
max_rx_data: u64,
max_rx_data_next: u64,
almost_full: bool,
tx_cap: usize,
tx_data: u64,
max_tx_data: u64,
max_send_bytes: usize,
streams: stream::StreamMap,
odcid: Option<ConnectionId<'static>>,
rscid: Option<ConnectionId<'static>>,
token: Option<Vec<u8>>,
local_error: Option<ConnectionError>,
peer_error: Option<ConnectionError>,
challenge: Option<Vec<u8>>,
blocked_limit: Option<u64>,
idle_timer: Option<time::Instant>,
draining_timer: Option<time::Instant>,
alpn: Vec<u8>,
is_server: bool,
derived_initial_secrets: bool,
did_version_negotiation: bool,
did_retry: bool,
got_peer_conn_id: bool,
verified_peer_address: bool,
peer_verified_address: bool,
parsed_peer_transport_params: bool,
handshake_completed: bool,
handshake_done_sent: bool,
handshake_confirmed: bool,
ack_eliciting_sent: bool,
closed: bool,
grease: bool,
keylog: Option<Box<dyn std::io::Write + Send + Sync>>,
#[cfg(feature = "qlog")]
qlog_streamer: Option<qlog::QlogStreamer>,
#[cfg(feature = "qlog")]
qlogged_peer_params: bool,
dgram_recv_queue: dgram::DatagramQueue,
dgram_send_queue: dgram::DatagramQueue,
}
#[inline]
pub fn accept(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let conn = Connection::new(scid, odcid, config, true)?;
Ok(conn)
}
#[inline]
pub fn connect(
server_name: Option<&str>, scid: &ConnectionId, config: &mut Config,
) -> Result<Pin<Box<Connection>>> {
let conn = Connection::new(scid, None, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.lock().unwrap().set_host_name(server_name)?;
}
Ok(conn)
}
#[inline]
pub fn negotiate_version(
scid: &ConnectionId, dcid: &ConnectionId, out: &mut [u8],
) -> Result<usize> {
packet::negotiate_version(scid, dcid, out)
}
#[inline]
pub fn retry(
scid: &ConnectionId, dcid: &ConnectionId, new_scid: &ConnectionId,
token: &[u8], version: u32, out: &mut [u8],
) -> Result<usize> {
packet::retry(scid, dcid, new_scid, token, version, out)
}
#[inline]
pub fn version_is_supported(version: u32) -> bool {
matches!(
version,
PROTOCOL_VERSION_DRAFT27 |
PROTOCOL_VERSION_DRAFT28 |
PROTOCOL_VERSION_DRAFT29
)
}
macro_rules! push_frame_to_pkt {
($out:expr, $frames:expr, $frame:expr, $left:expr) => {{
if $frame.wire_len() <= $left {
$left -= $frame.wire_len();
$frame.to_bytes(&mut $out)?;
$frames.push($frame);
true
} else {
false
}
}};
}
macro_rules! qlog_with {
($qlog_streamer:expr, $qlog_streamer_ref:ident, $body:block) => {{
#[cfg(feature = "qlog")]
{
if let Some($qlog_streamer_ref) = &mut $qlog_streamer {
$body
}
}
}};
}
impl Connection {
fn new(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let tls = config.tls_ctx.lock().unwrap().new_handshake()?;
Connection::with_tls(scid, odcid, config, tls, is_server)
}
fn with_tls(
scid: &ConnectionId, odcid: Option<&ConnectionId>, config: &mut Config,
tls: tls::Handshake, is_server: bool,
) -> Result<Pin<Box<Connection>>> {
let max_rx_data = config.local_transport_params.initial_max_data;
let scid_as_hex: Vec<String> =
scid.iter().map(|b| format!("{:02x}", b)).collect();
let mut conn = Box::pin(Connection {
version: config.version,
dcid: ConnectionId::default(),
scid: scid.to_vec().into(),
trace_id: scid_as_hex.join(""),
pkt_num_spaces: [
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
],
peer_transport_params: TransportParams::default(),
local_transport_params: config.local_transport_params.clone(),
handshake: Mutex::new(tls),
recovery: recovery::Recovery::new(&config),
application_protos: config.application_protos.clone(),
recv_count: 0,
sent_count: 0,
rx_data: 0,
max_rx_data,
max_rx_data_next: max_rx_data,
almost_full: false,
tx_cap: 0,
tx_data: 0,
max_tx_data: 0,
max_send_bytes: 0,
streams: stream::StreamMap::new(
config.local_transport_params.initial_max_streams_bidi,
config.local_transport_params.initial_max_streams_uni,
),
odcid: None,
rscid: None,
token: None,
local_error: None,
peer_error: None,
challenge: None,
blocked_limit: None,
idle_timer: None,
draining_timer: None,
alpn: Vec::new(),
is_server,
derived_initial_secrets: false,
did_version_negotiation: false,
did_retry: false,
got_peer_conn_id: false,
verified_peer_address: odcid.is_some(),
peer_verified_address: is_server,
parsed_peer_transport_params: false,
handshake_completed: false,
handshake_done_sent: false,
handshake_confirmed: false,
ack_eliciting_sent: false,
closed: false,
grease: config.grease,
keylog: None,
#[cfg(feature = "qlog")]
qlog_streamer: None,
#[cfg(feature = "qlog")]
qlogged_peer_params: false,
dgram_recv_queue: dgram::DatagramQueue::new(
config.dgram_recv_max_queue_len,
),
dgram_send_queue: dgram::DatagramQueue::new(
config.dgram_send_max_queue_len,
),
});
if let Some(odcid) = odcid {
conn.local_transport_params
.original_destination_connection_id = Some(odcid.to_vec().into());
conn.local_transport_params.retry_source_connection_id =
Some(scid.to_vec().into());
conn.did_retry = true;
}
conn.local_transport_params.initial_source_connection_id =
Some(scid.to_vec().into());
conn.handshake.lock().unwrap().init(&conn)?;
conn.handshake.lock().unwrap().use_legacy_codepoint(true);
conn.encode_transport_params()?;
if !is_server {
let mut dcid = [0; 16];
rand::rand_bytes(&mut dcid[..]);
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&dcid,
conn.version,
conn.is_server,
)?;
conn.dcid = dcid.to_vec().into();
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
conn.derived_initial_secrets = true;
}
Ok(conn)
}
#[inline]
pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send + Sync>) {
self.keylog = Some(writer);
}
#[cfg(feature = "qlog")]
pub fn set_qlog(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String,
) {
let vp = if self.is_server {
qlog::VantagePointType::Server
} else {
qlog::VantagePointType::Client
};
let trace = qlog::Trace::new(
qlog::VantagePoint {
name: None,
ty: vp,
flow: None,
},
Some(title.to_string()),
Some(description.to_string()),
Some(qlog::Configuration {
time_offset: Some("0".to_string()),
time_units: Some(qlog::TimeUnits::Ms),
original_uris: None,
}),
None,
);
let mut streamer = qlog::QlogStreamer::new(
qlog::QLOG_VERSION.to_string(),
Some(title),
Some(description),
None,
std::time::Instant::now(),
trace,
writer,
);
streamer.start_log().ok();
let handshake = self.handshake.lock().unwrap();
let ev = self.local_transport_params.to_qlog(
qlog::TransportOwner::Local,
self.version,
handshake.alpn_protocol(),
handshake.cipher(),
);
streamer.add_event(ev).ok();
self.qlog_streamer = Some(streamer);
}
pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
let len = buf.len();
if len == 0 {
return Err(Error::BufferTooShort);
}
if !self.verified_peer_address {
self.max_send_bytes += len * MAX_AMPLIFICATION_FACTOR;
}
let mut done = 0;
let mut left = len;
while left > 0 {
let read = match self.recv_single(&mut buf[len - left..len]) {
Ok(v) => v,
Err(Error::Done) => left,
Err(e) => {
self.close(false, e.to_wire(), b"").ok();
return Err(e);
},
};
done += read;
left -= read;
}
Ok(done)
}
fn recv_single(&mut self, buf: &mut [u8]) -> Result<usize> {
let now = time::Instant::now();
if buf.is_empty() {
return Err(Error::Done);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
if is_closing {
return Err(Error::Done);
}
let mut b = octets::OctetsMut::with_slice(buf);
let mut hdr =
Header::from_bytes(&mut b, self.scid.len()).map_err(|e| {
drop_pkt_on_err(
e,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
if hdr.ty == packet::Type::VersionNegotiation {
if self.is_server {
return Err(Error::Done);
}
if self.did_version_negotiation {
return Err(Error::Done);
}
if self.recv_count > 0 {
return Err(Error::Done);
}
if hdr.dcid != self.scid {
return Err(Error::Done);
}
if hdr.scid != self.dcid {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
let versions = hdr.versions.ok_or(Error::Done)?;
if versions.iter().any(|&v| v == self.version) {
return Err(Error::Done);
}
match versions.iter().filter(|&&v| version_is_supported(v)).max() {
Some(v) => self.version = *v,
None => {
return Err(Error::UnknownVersion);
},
};
self.did_version_negotiation = true;
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&self.dcid,
self.version,
self.is_server,
)?;
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.lock().unwrap().clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.handshake.lock().unwrap().use_legacy_codepoint(true);
self.encode_transport_params()?;
return Err(Error::Done);
}
if hdr.ty == packet::Type::Retry {
if self.is_server {
return Err(Error::Done);
}
if self.did_retry {
return Err(Error::Done);
}
if packet::verify_retry_integrity(&b, &self.dcid, self.version)
.is_err()
{
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
self.token = hdr.token;
self.did_retry = true;
self.odcid = Some(self.dcid.clone());
self.dcid = hdr.scid.clone();
self.rscid = Some(self.dcid.clone());
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.scid,
self.version,
self.is_server,
)?;
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.got_peer_conn_id = false;
self.handshake.lock().unwrap().clear()?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
return Err(Error::Done);
}
if self.is_server && !self.did_version_negotiation {
if !version_is_supported(hdr.version) {
return Err(Error::UnknownVersion);
}
self.version = hdr.version;
self.did_version_negotiation = true;
self.encode_transport_params()?;
}
if hdr.ty != packet::Type::Short && hdr.version != self.version {
return Err(Error::Done);
}
let payload_len = if hdr.ty == packet::Type::Short {
b.cap()
} else {
b.get_varint().map_err(|e| {
drop_pkt_on_err(
e.into(),
self.recv_count,
self.is_server,
&self.trace_id,
)
})? as usize
};
if !self.derived_initial_secrets {
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.dcid,
self.version,
self.is_server,
)?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.derived_initial_secrets = true;
}
let epoch = hdr.ty.to_epoch()?;
#[allow(clippy::or_fun_call)]
let aead = (self.pkt_num_spaces[epoch].crypto_0rtt_open.as_ref())
.filter(|_| hdr.ty == packet::Type::ZeroRTT)
.or(self.pkt_num_spaces[epoch].crypto_open.as_ref())
.ok_or_else(|| {
drop_pkt_on_err(
Error::CryptoFail,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
let aead_tag_len = aead.alg().tag_len();
packet::decrypt_hdr(&mut b, &mut hdr, &aead).map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
let pn = packet::decode_pkt_num(
self.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let pn_len = hdr.pkt_num_len;
trace!(
"{} rx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
qlog_with!(self.qlog_streamer, q, {
let packet_size = b.len();
let qlog_pkt_hdr = qlog::PacketHeader::with_type(
hdr.ty.to_qlog(),
pn,
Some(packet_size as u64),
Some(payload_len as u64),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
q.add_event(qlog::event::Event::packet_received(
hdr.ty.to_qlog(),
qlog_pkt_hdr,
Some(Vec::new()),
None,
None,
None,
))
.ok();
});
let mut payload = packet::decrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
&aead,
)
.map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
trace!("{} ignored duplicate packet {}", self.trace_id, pn);
return Err(Error::Done);
}
if payload.cap() == 0 {
return Err(Error::InvalidPacket);
}
if !self.is_server && !self.got_peer_conn_id {
if self.odcid.is_none() {
self.odcid = Some(self.dcid.clone());
}
self.dcid = hdr.scid.clone();
self.got_peer_conn_id = true;
}
if self.is_server && !self.got_peer_conn_id {
self.dcid = hdr.scid.clone();
if !self.did_retry && self.version >= PROTOCOL_VERSION_DRAFT28 {
self.local_transport_params
.original_destination_connection_id =
Some(hdr.dcid.to_vec().into());
self.encode_transport_params()?;
}
self.got_peer_conn_id = true;
}
let mut ack_elicited = false;
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
qlog_with!(self.qlog_streamer, q, {
q.add_frame(frame.to_qlog(), false).ok();
});
if frame.ack_eliciting() {
ack_elicited = true;
}
if let Err(e) = self.process_frame(frame, epoch, now) {
qlog_with!(self.qlog_streamer, q, {
q.finish_frames().ok();
});
return Err(e);
}
}
qlog_with!(self.qlog_streamer, q, {
q.finish_frames().ok();
});
qlog_with!(self.qlog_streamer, q, {
let ev = self.recovery.to_qlog();
q.add_event(ev).ok();
});
if self.is_established() {
qlog_with!(self.qlog_streamer, q, {
if !self.qlogged_peer_params {
let handshake = self.handshake.lock().unwrap();
let ev = self.peer_transport_params.to_qlog(
qlog::TransportOwner::Remote,
self.version,
handshake.alpn_protocol(),
handshake.cipher(),
);
q.add_event(ev).ok();
self.qlogged_peer_params = true;
}
});
}
for acked in self.recovery.acked[epoch].drain(..) {
match acked {
frame::Frame::ACK { ranges, .. } => {
if let Some(largest_acked) = ranges.last() {
self.pkt_num_spaces[epoch]
.recv_pkt_need_ack
.remove_until(largest_acked);
}
},
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.ack_and_drop(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
..
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
stream.send.ack_and_drop(offset, length);
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
frame::Frame::ResetStream { stream_id, .. } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
_ => (),
}
}
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
}
self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
self.pkt_num_spaces[epoch].ack_elicited =
cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
self.pkt_num_spaces[epoch].largest_rx_pkt_num =
cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
self.tx_cap = cmp::min(
self.recovery.cwnd_available() as u64,
self.max_tx_data - self.tx_data,
) as usize;
self.recv_count += 1;
let read = b.off() + aead_tag_len;
if self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
self.verified_peer_address = true;
}
self.ack_eliciting_sent = false;
Ok(read)
}
pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
let mut has_initial = false;
let mut done = 0;
let mut left = cmp::min(out.len(), self.max_send_udp_payload_size());
if !self.verified_peer_address && self.is_server {
left = cmp::min(left, self.max_send_bytes);
}
while left > 0 {
let (ty, written) =
match self.send_single(&mut out[done..done + left]) {
Ok(v) => v,
Err(Error::BufferTooShort) | Err(Error::Done) => break,
Err(e) => return Err(e),
};
done += written;
left -= written;
match ty {
packet::Type::Initial => has_initial = true,
packet::Type::Short => break,
_ => (),
};
}
if done == 0 {
return Err(Error::Done);
}
if has_initial && left > 0 && done < MIN_CLIENT_INITIAL_LEN {
let pad_len = cmp::min(left, MIN_CLIENT_INITIAL_LEN - done);
out[done..done + pad_len].fill(0);
done += pad_len;
}
Ok(done)
}
fn send_single(&mut self, out: &mut [u8]) -> Result<(packet::Type, usize)> {
let now = time::Instant::now();
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
if !self.derived_initial_secrets {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
if !is_closing {
self.do_handshake()?;
}
let mut b = octets::OctetsMut::with_slice(out);
let epoch = self.write_epoch()?;
let pkt_type = packet::Type::from_epoch(epoch);
for lost in self.recovery.lost[epoch].drain(..) {
match lost {
frame::Frame::CryptoHeader { offset, length } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.retransmit(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
fin,
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
let was_flushable = stream.is_flushable();
let empty_fin = length == 0 && fin;
stream.send.retransmit(offset, length);
if (stream.is_flushable() || empty_fin) && !was_flushable {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(
stream_id,
urgency,
incremental,
);
}
},
frame::Frame::ACK { .. } => {
self.pkt_num_spaces[epoch].ack_elicited = true;
},
frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
} =>
if self.streams.get(stream_id).is_some() {
self.streams
.mark_reset(stream_id, true, error_code, final_size);
},
frame::Frame::HandshakeDone => {
self.handshake_done_sent = false;
},
frame::Frame::MaxStreamData { stream_id, .. } => {
if self.streams.get(stream_id).is_some() {
self.streams.mark_almost_full(stream_id, true);
}
},
frame::Frame::MaxData { .. } => {
self.almost_full = true;
},
_ => (),
}
}
let mut left = b.cap();
left = cmp::min(left, self.recovery.cwnd_available());
let pn = self.pkt_num_spaces[epoch].next_pkt_num;
let pn_len = packet::pkt_num_len(pn)?;
let crypto_overhead = self.pkt_num_spaces[epoch]
.crypto_overhead()
.ok_or(Error::Done)?;
let hdr = Header {
ty: pkt_type,
version: self.version,
dcid: ConnectionId::from_ref(&self.dcid),
scid: ConnectionId::from_ref(&self.scid),
pkt_num: 0,
pkt_num_len: pn_len,
token: if pkt_type == packet::Type::Initial {
self.token.clone()
} else {
None
},
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
let mut overhead = b.off() + pn_len + crypto_overhead;
if pkt_type != packet::Type::Short {
overhead += 2;
}
match left.checked_sub(overhead) {
Some(v) => left = v,
None => {
self.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
if left < PAYLOAD_MIN_LEN {
self.recovery.update_app_limited(false);
return Err(Error::Done);
}
let mut frames: Vec<frame::Frame> = Vec::new();
let mut ack_eliciting = false;
let mut in_flight = false;
let mut has_data = false;
let header_offset = b.off();
if pkt_type != packet::Type::Short {
b.skip(PAYLOAD_LENGTH_LEN)?;
}
packet::encode_pkt_num(pn, &mut b)?;
let payload_offset = b.off();
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.len() > 0 &&
(self.pkt_num_spaces[epoch].ack_elicited ||
self.recovery.loss_probes[epoch] > 0) &&
!is_closing
{
let ack_delay =
self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
let ack_delay = ack_delay.as_micros() as u64 /
2_u64
.pow(self.local_transport_params.ack_delay_exponent as u32);
let frame = frame::Frame::ACK {
ack_delay,
ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.pkt_num_spaces[epoch].ack_elicited = false;
}
}
if pkt_type == packet::Type::Short && !is_closing {
if self.is_established() &&
!self.handshake_done_sent &&
self.is_server
{
let frame = frame::Frame::HandshakeDone;
if push_frame_to_pkt!(b, frames, frame, left) {
self.handshake_done_sent = true;
ack_eliciting = true;
in_flight = true;
}
}
if self.streams.should_update_max_streams_bidi() {
let frame = frame::Frame::MaxStreamsBidi {
max: self.streams.max_streams_bidi_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_bidi();
ack_eliciting = true;
in_flight = true;
}
}
if self.streams.should_update_max_streams_uni() {
let frame = frame::Frame::MaxStreamsUni {
max: self.streams.max_streams_uni_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_uni();
ack_eliciting = true;
in_flight = true;
}
}
if let Some(limit) = self.blocked_limit {
let frame = frame::Frame::DataBlocked { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.blocked_limit = None;
ack_eliciting = true;
in_flight = true;
}
}
for stream_id in self.streams.almost_full() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => {
self.streams.mark_almost_full(stream_id, false);
continue;
},
};
let frame = frame::Frame::MaxStreamData {
stream_id,
max: stream.recv.max_data_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
stream.recv.update_max_data();
self.streams.mark_almost_full(stream_id, false);
ack_eliciting = true;
in_flight = true;
self.almost_full = true;
}
}
if self.almost_full && self.max_rx_data < self.max_rx_data_next {
let frame = frame::Frame::MaxData {
max: self.max_rx_data_next,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.almost_full = false;
self.max_rx_data = self.max_rx_data_next;
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, error_code) in self
.streams
.stopped()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StopSending {
stream_id,
error_code,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_stopped(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, (error_code, final_size)) in self
.streams
.reset()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, (u64, u64))>>()
{
let frame = frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_reset(stream_id, false, 0, 0);
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, limit) in self
.streams
.blocked()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.mark_blocked(stream_id, false, 0);
ack_eliciting = true;
in_flight = true;
}
}
}
if let Some(conn_err) = self.local_error.as_ref() {
if conn_err.is_app {
if pkt_type == packet::Type::Short {
let frame = frame::Frame::ApplicationClose {
error_code: conn_err.error_code,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer =
Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
} else {
let frame = frame::Frame::ConnectionClose {
error_code: conn_err.error_code,
frame_type: 0,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
}
if let Some(ref challenge) = self.challenge {
let frame = frame::Frame::PathResponse {
data: challenge.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.challenge = None;
ack_eliciting = true;
in_flight = true;
}
}
if self.pkt_num_spaces[epoch].crypto_stream.is_flushable() &&
left > frame::MAX_CRYPTO_OVERHEAD &&
!is_closing
{
let max_len = left - frame::MAX_CRYPTO_OVERHEAD;
let crypto_off =
self.pkt_num_spaces[epoch].crypto_stream.send.off_front();
let hdr_off = b.off();
let hdr_len = 1 +
octets::varint_len(crypto_off) +
2;
let (mut crypto_hdr, mut crypto_payload) =
b.split_at(hdr_off + hdr_len)?;
let (len, _) = self.pkt_num_spaces[epoch]
.crypto_stream
.send
.emit(&mut crypto_payload.as_mut()[..max_len])?;
crypto_hdr.skip(hdr_off)?;
frame::encode_crypto_header(crypto_off, len as u64, &mut crypto_hdr)?;
b.skip(hdr_len + len)?;
let frame = frame::Frame::CryptoHeader {
offset: crypto_off,
length: len,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
}
if pkt_type == packet::Type::Short &&
left > frame::MAX_DGRAM_OVERHEAD &&
!is_closing
{
if let Some(max_dgram_payload) = self.dgram_max_writable_len() {
while let Some(len) = self.dgram_send_queue.peek_front_len() {
if (len + frame::MAX_DGRAM_OVERHEAD) <= left {
match self.dgram_send_queue.pop() {
Some(data) => {
let frame = frame::Frame::Datagram { data };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
},
None => continue,
};
} else if len > max_dgram_payload {
self.dgram_send_queue.pop();
} else {
break;
}
}
}
}
if pkt_type == packet::Type::Short &&
left > frame::MAX_STREAM_OVERHEAD &&
!is_closing
{
while let Some(stream_id) = self.streams.pop_flushable() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
if stream.send.is_stopped() {
continue;
}
let stream_off = stream.send.off_front();
let hdr_off = b.off();
let hdr_len = 1 +
octets::varint_len(stream_id) +
octets::varint_len(stream_off) +
2;
let max_len = match left.checked_sub(hdr_len) {
Some(v) => v,
None => continue,
};
let (mut stream_hdr, mut stream_payload) =
b.split_at(hdr_off + hdr_len)?;
let (len, fin) =
stream.send.emit(&mut stream_payload.as_mut()[..max_len])?;
stream_hdr.skip(hdr_off)?;
frame::encode_stream_header(
stream_id,
stream_off,
len as u64,
fin,
&mut stream_hdr,
)?;
b.skip(hdr_len + len)?;
let frame = frame::Frame::StreamHeader {
stream_id,
offset: stream_off,
length: len,
fin,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
if stream.is_flushable() {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(stream_id, urgency, incremental);
}
if cfg!(feature = "fuzzing") && left > frame::MAX_STREAM_OVERHEAD
{
continue;
}
break;
}
}
if self.recovery.loss_probes[epoch] > 0 &&
!ack_eliciting &&
left >= 1 &&
!is_closing
{
let frame = frame::Frame::Ping;
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
}
if ack_eliciting {
self.recovery.loss_probes[epoch] =
self.recovery.loss_probes[epoch].saturating_sub(1);
}
if frames.is_empty() {
self.recovery.update_app_limited(false);
return Err(Error::Done);
}
if b.off() - payload_offset < PAYLOAD_MIN_LEN {
let payload_len = b.off() - payload_offset;
let frame = frame::Frame::Padding {
len: PAYLOAD_MIN_LEN - payload_len,
};
#[allow(unused_assignments)]
if push_frame_to_pkt!(b, frames, frame, left) {
in_flight = true;
}
}
let payload_len = b.off() - payload_offset;
let payload_len = payload_len + crypto_overhead;
if pkt_type != packet::Type::Short {
let len = pn_len + payload_len;
let (_, mut payload_with_len) = b.split_at(header_offset)?;
payload_with_len
.put_varint_with_len(len as u64, PAYLOAD_LENGTH_LEN)?;
}
trace!(
"{} tx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
qlog_with!(self.qlog_streamer, q, {
let qlog_pkt_hdr = qlog::PacketHeader::with_type(
hdr.ty.to_qlog(),
pn,
Some(payload_len as u64 + payload_offset as u64),
Some(payload_len as u64),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
let packet_sent_ev = qlog::event::Event::packet_sent_min(
hdr.ty.to_qlog(),
qlog_pkt_hdr,
Some(Vec::new()),
);
q.add_event(packet_sent_ev).ok();
});
for frame in &mut frames {
trace!("{} tx frm {:?}", self.trace_id, frame);
qlog_with!(self.qlog_streamer, q, {
q.add_frame(frame.to_qlog(), false).ok();
});
frame.shrink_for_retransmission();
}
qlog_with!(self.qlog_streamer, q, {
q.finish_frames().ok();
});
let aead = match self.pkt_num_spaces[epoch].crypto_seal {
Some(ref v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)?;
let sent_pkt = recovery::Sent {
pkt_num: pn,
frames,
time_sent: now,
time_acked: None,
time_lost: None,
size: if ack_eliciting { written } else { 0 },
ack_eliciting,
in_flight,
delivered: 0,
delivered_time: now,
recent_delivered_packet_sent_time: now,
is_app_limited: false,
has_data,
};
self.recovery.on_packet_sent(
sent_pkt,
epoch,
self.handshake_status(),
now,
&self.trace_id,
);
qlog_with!(self.qlog_streamer, q, {
let ev = self.recovery.to_qlog();
q.add_event(ev).ok();
});
self.pkt_num_spaces[epoch].next_pkt_num += 1;
self.sent_count += 1;
if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
self.recovery.update_app_limited(false);
}
if !self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_epoch_state(packet::EPOCH_INITIAL, now);
}
self.max_send_bytes = self.max_send_bytes.saturating_sub(written);
if ack_eliciting && !self.ack_eliciting_sent {
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
}
if ack_eliciting {
self.ack_eliciting_sent = true;
}
Ok((pkt_type, written))
}
pub fn stream_recv(
&mut self, stream_id: u64, out: &mut [u8],
) -> Result<(usize, bool)> {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let stream = self
.streams
.get_mut(stream_id)
.ok_or(Error::InvalidStreamState)?;
if !stream.is_readable() {
return Err(Error::Done);
}
#[cfg(feature = "qlog")]
let offset = stream.recv.off_front();
let (read, fin) = stream.recv.emit(out)?;
self.max_rx_data_next = self.max_rx_data_next.saturating_add(read as u64);
let readable = stream.is_readable();
let complete = stream.is_complete();
let local = stream.local;
if stream.recv.almost_full() {
self.streams.mark_almost_full(stream_id, true);
}
if !readable {
self.streams.mark_readable(stream_id, false);
}
if complete {
self.streams.collect(stream_id, local);
}
qlog_with!(self.qlog_streamer, q, {
let ev = qlog::event::Event::h3_data_moved(
stream_id.to_string(),
Some(offset.to_string()),
Some(read as u64),
Some(qlog::H3DataRecipient::Transport),
None,
None,
);
q.add_event(ev).ok();
});
if self.should_update_max_data() {
self.almost_full = true;
}
Ok((read, fin))
}
pub fn stream_send(
&mut self, stream_id: u64, buf: &[u8], fin: bool,
) -> Result<usize> {
if !stream::is_bidi(stream_id) &&
!stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
if self.max_tx_data - self.tx_data < buf.len() as u64 {
self.blocked_limit = Some(self.max_tx_data);
}
let cap = self.tx_cap;
let (buf, fin) = if cap < buf.len() {
(&buf[..cap], false)
} else {
(buf, fin)
};
let stream = self.get_or_create_stream(stream_id, true)?;
#[cfg(feature = "qlog")]
let offset = stream.send.off_back();
let was_flushable = stream.is_flushable();
let sent = match stream.send.write(buf, fin) {
Ok(v) => v,
Err(e) => {
self.streams.mark_writable(stream_id, false);
return Err(e);
},
};
let urgency = stream.urgency;
let incremental = stream.incremental;
let flushable = stream.is_flushable();
let writable = stream.is_writable();
let empty_fin = buf.is_empty() && fin;
if sent < buf.len() {
let max_off = stream.send.max_off();
self.streams.mark_blocked(stream_id, true, max_off);
} else {
self.streams.mark_blocked(stream_id, false, 0);
}
if (flushable || empty_fin) && !was_flushable {
self.streams.push_flushable(stream_id, urgency, incremental);
}
if !writable {
self.streams.mark_writable(stream_id, false);
}
self.tx_cap -= sent;
self.tx_data += sent as u64;
self.recovery.rate_check_app_limited();
qlog_with!(self.qlog_streamer, q, {
let ev = qlog::event::Event::h3_data_moved(
stream_id.to_string(),
Some(offset.to_string()),
Some(sent as u64),
None,
Some(qlog::H3DataRecipient::Transport),
None,
);
q.add_event(ev).ok();
});
Ok(sent)
}
pub fn stream_priority(
&mut self, stream_id: u64, urgency: u8, incremental: bool,
) -> Result<()> {
let stream = match self.get_or_create_stream(stream_id, true) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
if stream.urgency == urgency && stream.incremental == incremental {
return Ok(());
}
stream.urgency = urgency;
stream.incremental = incremental;
Ok(())
}
pub fn stream_shutdown(
&mut self, stream_id: u64, direction: Shutdown, err: u64,
) -> Result<()> {
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
match direction {
Shutdown::Read => {
stream.recv.shutdown()?;
if !stream.recv.is_fin() {
self.streams.mark_stopped(stream_id, true, err);
}
self.streams.mark_readable(stream_id, false);
},
Shutdown::Write => {
let final_size = stream.send.shutdown()?;
self.streams.mark_reset(stream_id, true, err, final_size);
self.streams.mark_writable(stream_id, false);
},
}
Ok(())
}
#[inline]
pub fn stream_capacity(&self, stream_id: u64) -> Result<usize> {
if let Some(stream) = self.streams.get(stream_id) {
let cap = cmp::min(self.tx_cap, stream.send.cap()?);
return Ok(cap);
};
Err(Error::InvalidStreamState)
}
pub fn stream_readable(&self, stream_id: u64) -> bool {
let stream = match self.streams.get(stream_id) {
Some(v) => v,
None => return false,
};
stream.is_readable()
}
#[inline]
pub fn stream_finished(&self, stream_id: u64) -> bool {
let stream = match self.streams.get(stream_id) {
Some(v) => v,
None => return true,
};
stream.recv.is_fin()
}
#[inline]
pub fn peer_streams_left_bidi(&self) -> u64 {
self.streams.peer_streams_left_bidi()
}
#[inline]
pub fn peer_streams_left_uni(&self) -> u64 {
self.streams.peer_streams_left_uni()
}
pub fn stream_init_application_data<T>(
&mut self, stream_id: u64, data: T,
) -> Result<()>
where
T: std::any::Any + Send + Sync,
{
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
if stream.data.is_some() {
return Err(Error::Done);
}
stream.data = Some(Box::new(data));
Ok(())
}
pub fn stream_application_data(
&mut self, stream_id: u64,
) -> Option<&mut dyn std::any::Any> {
let stream = self.streams.get_mut(stream_id)?;
if let Some(ref mut stream_data) = stream.data {
return Some(stream_data.as_mut());
}
None
}
#[inline]
pub fn readable(&self) -> StreamIter {
self.streams.readable()
}
#[inline]
pub fn writable(&self) -> StreamIter {
if self.tx_cap == 0 {
return StreamIter::default();
}
self.streams.writable()
}
pub fn max_send_udp_payload_size(&self) -> usize {
if self.is_established() {
cmp::min(16383, self.recovery.max_datagram_size())
} else {
MIN_CLIENT_INITIAL_LEN
}
}
#[inline]
pub fn dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize> {
match self.dgram_recv_queue.pop() {
Some(d) => {
if d.len() > buf.len() {
return Err(Error::BufferTooShort);
}
buf[..d.len()].copy_from_slice(&d);
Ok(d.len())
},
None => Err(Error::Done),
}
}
#[inline]
pub fn dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize> {
self.dgram_recv_queue.peek_front_bytes(buf, len)
}
#[inline]
pub fn dgram_recv_front_len(&self) -> Option<usize> {
self.dgram_recv_queue.peek_front_len()
}
pub fn dgram_send(&mut self, buf: &[u8]) -> Result<()> {
let max_payload_len = match self.dgram_max_writable_len() {
Some(v) => v as usize,
None => {
return Err(Error::InvalidState);
},
};
if buf.len() > max_payload_len {
return Err(Error::BufferTooShort);
}
self.dgram_send_queue.push(buf)?;
if self.dgram_send_queue.byte_size() > self.recovery.cwnd_available() {
self.recovery.update_app_limited(false);
}
Ok(())
}
#[inline]
pub fn dgram_purge_outgoing<F: Fn(&[u8]) -> bool>(&mut self, f: F) {
self.dgram_send_queue.purge(f);
}
#[inline]
pub fn dgram_max_writable_len(&self) -> Option<usize> {
match self.peer_transport_params.max_datagram_frame_size {
None => None,
Some(peer_frame_len) => {
let mut max_len = self.max_send_udp_payload_size();
max_len = max_len.saturating_sub(1 + self.dcid.len());
max_len = max_len.saturating_sub(packet::MAX_PKT_NUM_LEN);
max_len = max_len.saturating_sub(
self.pkt_num_spaces[packet::EPOCH_APPLICATION]
.crypto_overhead()?,
);
max_len = cmp::min(peer_frame_len as usize, max_len);
max_len.checked_sub(1 + frame::MAX_DGRAM_OVERHEAD)
},
}
}
fn dgram_enabled(&self) -> bool {
self.local_transport_params
.max_datagram_frame_size
.is_some()
}
pub fn timeout(&self) -> Option<time::Duration> {
if self.is_closed() {
return None;
}
let timeout = if self.is_draining() {
self.draining_timer
} else {
let timers = [self.idle_timer, self.recovery.loss_detection_timer()];
timers.iter().filter_map(|&x| x).min()
};
if let Some(timeout) = timeout {
let now = time::Instant::now();
if timeout <= now {
return Some(time::Duration::new(0, 0));
}
return Some(timeout.duration_since(now));
}
None
}
pub fn on_timeout(&mut self) {
let now = time::Instant::now();
if let Some(draining_timer) = self.draining_timer {
if draining_timer <= now {
trace!("{} draining timeout expired", self.trace_id);
qlog_with!(self.qlog_streamer, q, {
q.finish_log().ok();
});
self.closed = true;
}
return;
}
if let Some(timer) = self.idle_timer {
if timer <= now {
trace!("{} idle timeout expired", self.trace_id);
qlog_with!(self.qlog_streamer, q, {
q.finish_log().ok();
});
self.closed = true;
return;
}
}
if let Some(timer) = self.recovery.loss_detection_timer() {
if timer <= now {
trace!("{} loss detection timeout expired", self.trace_id);
self.recovery.on_loss_detection_timeout(
self.handshake_status(),
now,
&self.trace_id,
);
qlog_with!(self.qlog_streamer, q, {
let ev = self.recovery.to_qlog();
q.add_event(ev).ok();
});
return;
}
}
}
pub fn close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()> {
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
if self.local_error.is_some() {
return Err(Error::Done);
}
self.local_error = Some(ConnectionError {
is_app: app,
error_code: err,
reason: reason.to_vec(),
});
if self.recv_count == 0 {
self.closed = true;
}
Ok(())
}
#[inline]
pub fn trace_id(&self) -> &str {
&self.trace_id
}
#[inline]
pub fn application_proto(&self) -> &[u8] {
self.alpn.as_ref()
}
#[inline]
pub fn peer_cert(&self) -> Option<Vec<u8>> {
self.handshake.lock().unwrap().peer_cert()
}
#[inline]
pub fn source_id(&self) -> ConnectionId {
ConnectionId::from_ref(self.scid.as_ref())
}
#[inline]
pub fn destination_id(&self) -> ConnectionId {
ConnectionId::from_ref(self.dcid.as_ref())
}
#[inline]
pub fn is_established(&self) -> bool {
self.handshake_completed
}
#[inline]
pub fn is_resumed(&self) -> bool {
self.handshake.lock().unwrap().is_resumed()
}
#[inline]
pub fn is_in_early_data(&self) -> bool {
self.handshake.lock().unwrap().is_in_early_data()
}
#[inline]
pub fn is_readable(&self) -> bool {
self.streams.has_readable() || self.dgram_recv_front_len().is_some()
}
#[inline]
pub fn is_draining(&self) -> bool {
self.draining_timer.is_some()
}
#[inline]
pub fn is_closed(&self) -> bool {
self.closed
}
#[inline]
pub fn peer_error(&self) -> Option<&ConnectionError> {
self.peer_error.as_ref()
}
#[inline]
pub fn stats(&self) -> Stats {
Stats {
recv: self.recv_count,
sent: self.sent_count,
lost: self.recovery.lost_count,
cwnd: self.recovery.cwnd(),
rtt: self.recovery.rtt(),
delivery_rate: self.recovery.delivery_rate(),
}
}
fn encode_transport_params(&mut self) -> Result<()> {
let mut raw_params = [0; 128];
let raw_params = TransportParams::encode(
&self.local_transport_params,
self.is_server,
&mut raw_params,
)?;
self.handshake
.lock()
.unwrap()
.set_quic_transport_params(raw_params)?;
Ok(())
}
fn do_handshake(&mut self) -> Result<()> {
let handshake = self.handshake.lock().unwrap();
if handshake.is_completed() {
return Ok(());
}
match handshake.do_handshake() {
Ok(_) => (),
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
self.handshake_completed = handshake.is_completed();
self.alpn = handshake.alpn_protocol().to_vec();
trace!("{} connection established: proto={:?} cipher={:?} curve={:?} sigalg={:?} resumed={} {:?}",
&self.trace_id,
std::str::from_utf8(handshake.alpn_protocol()),
handshake.cipher(),
handshake.curve(),
handshake.sigalg(),
handshake.is_resumed(),
self.peer_transport_params);
Ok(())
}
fn write_epoch(&self) -> Result<packet::Epoch> {
if self
.local_error
.as_ref()
.map_or(false, |conn_err| !conn_err.is_app)
{
let epoch = match self.handshake.lock().unwrap().write_level() {
crypto::Level::Initial => packet::EPOCH_INITIAL,
crypto::Level::ZeroRTT => unreachable!(),
crypto::Level::Handshake => packet::EPOCH_HANDSHAKE,
crypto::Level::OneRTT => packet::EPOCH_APPLICATION,
};
if epoch == packet::EPOCH_APPLICATION && !self.is_established() {
return Ok(packet::EPOCH_HANDSHAKE);
}
return Ok(epoch);
}
for epoch in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
if self.pkt_num_spaces[epoch].crypto_seal.is_none() {
continue;
}
if self.pkt_num_spaces[epoch].ready() {
return Ok(epoch);
}
if !self.recovery.lost[epoch].is_empty() {
return Ok(epoch);
}
if self.recovery.loss_probes[epoch] > 0 {
return Ok(epoch);
}
}
if (self.is_established() || self.is_in_early_data()) &&
(!self.handshake_done_sent ||
self.almost_full ||
self.blocked_limit.is_some() ||
self.dgram_send_queue.has_pending() ||
self.local_error
.as_ref()
.map_or(false, |conn_err| conn_err.is_app) ||
self.streams.should_update_max_streams_bidi() ||
self.streams.should_update_max_streams_uni() ||
self.streams.has_flushable() ||
self.streams.has_almost_full() ||
self.streams.has_blocked() ||
self.streams.has_reset() ||
self.streams.has_stopped())
{
return Ok(packet::EPOCH_APPLICATION);
}
Err(Error::Done)
}
fn get_or_create_stream(
&mut self, id: u64, local: bool,
) -> Result<&mut stream::Stream> {
self.streams.get_or_create(
id,
&self.local_transport_params,
&self.peer_transport_params,
local,
self.is_server,
)
}
fn process_frame(
&mut self, frame: frame::Frame, epoch: packet::Epoch, now: time::Instant,
) -> Result<()> {
trace!("{} rx frm {:?}", self.trace_id, frame);
match frame {
frame::Frame::Padding { .. } => (),
frame::Frame::Ping => (),
frame::Frame::ACK { ranges, ack_delay } => {
let ack_delay = ack_delay
.checked_mul(2_u64.pow(
self.peer_transport_params.ack_delay_exponent as u32,
))
.ok_or(Error::InvalidFrame)?;
if epoch == packet::EPOCH_HANDSHAKE {
self.peer_verified_address = true;
}
if epoch == packet::EPOCH_APPLICATION && self.is_established() {
self.peer_verified_address = true;
self.handshake_confirmed = true;
}
self.recovery.on_ack_received(
&ranges,
ack_delay,
epoch,
self.handshake_status(),
now,
&self.trace_id,
)?;
if self.handshake_confirmed {
self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
}
},
frame::Frame::ResetStream {
stream_id,
final_size,
..
} => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
self.rx_data += stream.recv.reset(final_size)? as u64;
if self.rx_data > self.max_rx_data {
return Err(Error::FlowControl);
}
},
frame::Frame::StopSending {
stream_id,
error_code,
} => {
if !stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
return Err(Error::InvalidStreamState);
}
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let was_writable = stream.is_writable();
if let Ok(final_size) = stream.send.stop(error_code) {
self.streams
.mark_reset(stream_id, true, error_code, final_size);
if !was_writable {
self.streams.mark_writable(stream_id, true);
}
}
},
frame::Frame::Crypto { data } => {
self.pkt_num_spaces[epoch].crypto_stream.recv.write(data)?;
let mut crypto_buf = [0; 512];
let level = crypto::Level::from_epoch(epoch);
let stream = &mut self.pkt_num_spaces[epoch].crypto_stream;
while let Ok((read, _)) = stream.recv.emit(&mut crypto_buf) {
let recv_buf = &crypto_buf[..read];
self.handshake
.lock()
.unwrap()
.provide_data(level, &recv_buf)?;
}
self.do_handshake()?;
let handshake = self.handshake.lock().unwrap();
let raw_params = handshake.quic_transport_params();
if !self.parsed_peer_transport_params && !raw_params.is_empty() {
let peer_params =
TransportParams::decode(&raw_params, self.is_server)?;
if self.version >= PROTOCOL_VERSION_DRAFT28 {
match &peer_params.initial_source_connection_id {
Some(v) if v != &self.dcid =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None => return Err(Error::InvalidTransportParam),
}
if let Some(odcid) = &self.odcid {
match &peer_params.original_destination_connection_id
{
Some(v) if v != odcid =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None if !self.is_server =>
return Err(Error::InvalidTransportParam),
None => (),
}
}
if let Some(rscid) = &self.rscid {
match &peer_params.retry_source_connection_id {
Some(v) if v != rscid =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None => return Err(Error::InvalidTransportParam),
}
}
} else {
if self.did_retry &&
peer_params.original_destination_connection_id !=
self.odcid
{
return Err(Error::InvalidTransportParam);
}
}
self.max_tx_data = peer_params.initial_max_data;
self.streams.update_peer_max_streams_bidi(
peer_params.initial_max_streams_bidi,
);
self.streams.update_peer_max_streams_uni(
peer_params.initial_max_streams_uni,
);
self.recovery.max_ack_delay =
time::Duration::from_millis(peer_params.max_ack_delay);
self.recovery.update_max_datagram_size(
peer_params.max_udp_payload_size as usize,
);
self.peer_transport_params = peer_params;
self.parsed_peer_transport_params = true;
}
},
frame::Frame::CryptoHeader { .. } => unreachable!(),
frame::Frame::NewToken { .. } => (),
frame::Frame::Stream { stream_id, data } => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let max_rx_data_left = self.max_rx_data - self.rx_data;
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let max_off_delta =
data.max_off().saturating_sub(stream.recv.max_off());
if max_off_delta > max_rx_data_left {
return Err(Error::FlowControl);
}
stream.recv.write(data)?;
if stream.is_readable() {
self.streams.mark_readable(stream_id, true);
}
self.rx_data += max_off_delta;
},
frame::Frame::StreamHeader { .. } => unreachable!(),
frame::Frame::MaxData { max } => {
self.max_tx_data = cmp::max(self.max_tx_data, max);
},
frame::Frame::MaxStreamData { stream_id, max } => {
if !stream::is_bidi(stream_id) &&
!stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let was_flushable = stream.is_flushable();
stream.send.update_max_data(max);
let writable = stream.is_writable();
if stream.is_flushable() && !was_flushable {
let urgency = stream.urgency;
let incremental = stream.incremental;
self.streams.push_flushable(stream_id, urgency, incremental);
}
if writable {
self.streams.mark_writable(stream_id, true);
}
},
frame::Frame::MaxStreamsBidi { max } => {
if max > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams.update_peer_max_streams_bidi(max);
},
frame::Frame::MaxStreamsUni { max } => {
if max > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams.update_peer_max_streams_uni(max);
},
frame::Frame::DataBlocked { .. } => (),
frame::Frame::StreamDataBlocked { .. } => (),
frame::Frame::StreamsBlockedBidi { limit } =>
if limit > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
},
frame::Frame::StreamsBlockedUni { limit } =>
if limit > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
},
frame::Frame::NewConnectionId { .. } => (),
frame::Frame::RetireConnectionId { .. } => (),
frame::Frame::PathChallenge { data } => {
self.challenge = Some(data);
},
frame::Frame::PathResponse { .. } => (),
frame::Frame::ConnectionClose {
error_code, reason, ..
} => {
self.peer_error = Some(ConnectionError {
is_app: false,
error_code,
reason,
});
self.draining_timer = Some(now + (self.recovery.pto() * 3));
},
frame::Frame::ApplicationClose { error_code, reason } => {
self.peer_error = Some(ConnectionError {
is_app: true,
error_code,
reason,
});
self.draining_timer = Some(now + (self.recovery.pto() * 3));
},
frame::Frame::HandshakeDone => {
if self.is_server {
return Err(Error::InvalidPacket);
}
self.peer_verified_address = true;
self.handshake_confirmed = true;
self.drop_epoch_state(packet::EPOCH_HANDSHAKE, now);
},
frame::Frame::Datagram { data } => {
if !self.dgram_enabled() {
return Err(Error::InvalidState);
}
if self.dgram_recv_queue.is_full() {
self.dgram_recv_queue.pop();
}
self.dgram_recv_queue.push(&data)?;
},
}
Ok(())
}
fn drop_epoch_state(&mut self, epoch: packet::Epoch, now: time::Instant) {
if self.pkt_num_spaces[epoch].crypto_open.is_none() {
return;
}
self.pkt_num_spaces[epoch].crypto_open = None;
self.pkt_num_spaces[epoch].crypto_seal = None;
self.pkt_num_spaces[epoch].clear();
self.recovery.on_pkt_num_space_discarded(
epoch,
self.handshake_status(),
now,
);
trace!("{} dropped epoch {} state", self.trace_id, epoch);
}
fn should_update_max_data(&self) -> bool {
self.max_rx_data_next != self.max_rx_data &&
self.max_rx_data_next / 2 > self.max_rx_data - self.rx_data
}
fn idle_timeout(&mut self) -> Option<time::Duration> {
if self.local_transport_params.max_idle_timeout == 0 &&
self.peer_transport_params.max_idle_timeout == 0
{
return None;
}
let idle_timeout = if self.local_transport_params.max_idle_timeout == 0 {
self.peer_transport_params.max_idle_timeout
} else if self.peer_transport_params.max_idle_timeout == 0 {
self.local_transport_params.max_idle_timeout
} else {
cmp::min(
self.local_transport_params.max_idle_timeout,
self.peer_transport_params.max_idle_timeout,
)
};
let idle_timeout = time::Duration::from_millis(idle_timeout);
let idle_timeout = cmp::max(idle_timeout, 3 * self.recovery.pto());
Some(idle_timeout)
}
fn handshake_status(&self) -> recovery::HandshakeStatus {
recovery::HandshakeStatus {
has_handshake_keys: self.pkt_num_spaces[packet::EPOCH_HANDSHAKE]
.has_keys(),
peer_verified_address: self.peer_verified_address,
completed: self.is_established(),
}
}
}
fn drop_pkt_on_err(
e: Error, recv_count: usize, is_server: bool, trace_id: &str,
) -> Error {
if is_server && recv_count == 0 {
return e;
}
trace!("{} dropped invalid packet", trace_id);
Error::Done
}
#[derive(Clone)]
pub struct Stats {
pub recv: usize,
pub sent: usize,
pub lost: usize,
pub rtt: time::Duration,
pub cwnd: usize,
pub delivery_rate: u64,
}
impl std::fmt::Debug for Stats {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"recv={} sent={} lost={} rtt={:?} cwnd={}",
self.recv, self.sent, self.lost, self.rtt, self.cwnd,
)
}
}
#[derive(Clone, Debug, PartialEq)]
struct TransportParams {
pub original_destination_connection_id: Option<ConnectionId<'static>>,
pub max_idle_timeout: u64,
pub stateless_reset_token: Option<Vec<u8>>,
pub max_udp_payload_size: u64,
pub initial_max_data: u64,
pub initial_max_stream_data_bidi_local: u64,
pub initial_max_stream_data_bidi_remote: u64,
pub initial_max_stream_data_uni: u64,
pub initial_max_streams_bidi: u64,
pub initial_max_streams_uni: u64,
pub ack_delay_exponent: u64,
pub max_ack_delay: u64,
pub disable_active_migration: bool,
pub active_conn_id_limit: u64,
pub initial_source_connection_id: Option<ConnectionId<'static>>,
pub retry_source_connection_id: Option<ConnectionId<'static>>,
pub max_datagram_frame_size: Option<u64>,
}
impl Default for TransportParams {
fn default() -> TransportParams {
TransportParams {
original_destination_connection_id: None,
max_idle_timeout: 0,
stateless_reset_token: None,
max_udp_payload_size: 65527,
initial_max_data: 0,
initial_max_stream_data_bidi_local: 0,
initial_max_stream_data_bidi_remote: 0,
initial_max_stream_data_uni: 0,
initial_max_streams_bidi: 0,
initial_max_streams_uni: 0,
ack_delay_exponent: 3,
max_ack_delay: 25,
disable_active_migration: false,
active_conn_id_limit: 2,
initial_source_connection_id: None,
retry_source_connection_id: None,
max_datagram_frame_size: None,
}
}
}
impl TransportParams {
fn decode(buf: &[u8], is_server: bool) -> Result<TransportParams> {
let mut params = octets::Octets::with_slice(buf);
let mut tp = TransportParams::default();
while params.cap() > 0 {
let id = params.get_varint()?;
let mut val = params.get_bytes_with_varint_length()?;
match id {
0x0000 => {
if is_server {
return Err(Error::InvalidTransportParam);
}
tp.original_destination_connection_id =
Some(val.to_vec().into());
},
0x0001 => {
tp.max_idle_timeout = val.get_varint()?;
},
0x0002 => {
if is_server {
return Err(Error::InvalidTransportParam);
}
tp.stateless_reset_token = Some(val.get_bytes(16)?.to_vec());
},
0x0003 => {
tp.max_udp_payload_size = val.get_varint()?;
if tp.max_udp_payload_size < 1200 {
return Err(Error::InvalidTransportParam);
}
},
0x0004 => {
tp.initial_max_data = val.get_varint()?;
},
0x0005 => {
tp.initial_max_stream_data_bidi_local = val.get_varint()?;
},
0x0006 => {
tp.initial_max_stream_data_bidi_remote = val.get_varint()?;
},
0x0007 => {
tp.initial_max_stream_data_uni = val.get_varint()?;
},
0x0008 => {
let max = val.get_varint()?;
if max > MAX_STREAM_ID {
return Err(Error::InvalidTransportParam);
}
tp.initial_max_streams_bidi = max;
},
0x0009 => {
let max = val.get_varint()?;
if max > MAX_STREAM_ID {
return Err(Error::InvalidTransportParam);
}
tp.initial_max_streams_uni = max;
},
0x000a => {
let ack_delay_exponent = val.get_varint()?;
if ack_delay_exponent > 20 {
return Err(Error::InvalidTransportParam);
}
tp.ack_delay_exponent = ack_delay_exponent;
},
0x000b => {
let max_ack_delay = val.get_varint()?;
if max_ack_delay >= 2_u64.pow(14) {
return Err(Error::InvalidTransportParam);
}
tp.max_ack_delay = max_ack_delay;
},
0x000c => {
tp.disable_active_migration = true;
},
0x000d => {
if is_server {
return Err(Error::InvalidTransportParam);
}
},
0x000e => {
let limit = val.get_varint()?;
if limit < 2 {
return Err(Error::InvalidTransportParam);
}
tp.active_conn_id_limit = limit;
},
0x000f => {
tp.initial_source_connection_id = Some(val.to_vec().into());
},
0x00010 => {
if is_server {
return Err(Error::InvalidTransportParam);
}
tp.retry_source_connection_id = Some(val.to_vec().into());
},
0x0020 => {
tp.max_datagram_frame_size = Some(val.get_varint()?);
},
_ => (),
}
}
Ok(tp)
}
fn encode_param(
b: &mut octets::OctetsMut, ty: u64, len: usize,
) -> Result<()> {
b.put_varint(ty)?;
b.put_varint(len as u64)?;
Ok(())
}
fn encode<'a>(
tp: &TransportParams, is_server: bool, out: &'a mut [u8],
) -> Result<&'a mut [u8]> {
let mut b = octets::OctetsMut::with_slice(out);
if is_server {
if let Some(ref odcid) = tp.original_destination_connection_id {
TransportParams::encode_param(&mut b, 0x0000, odcid.len())?;
b.put_bytes(&odcid)?;
}
};
if tp.max_idle_timeout != 0 {
TransportParams::encode_param(
&mut b,
0x0001,
octets::varint_len(tp.max_idle_timeout),
)?;
b.put_varint(tp.max_idle_timeout)?;
}
if is_server {
if let Some(ref token) = tp.stateless_reset_token {
TransportParams::encode_param(&mut b, 0x0002, token.len())?;
b.put_bytes(&token)?;
}
}
if tp.max_udp_payload_size != 0 {
TransportParams::encode_param(
&mut b,
0x0003,
octets::varint_len(tp.max_udp_payload_size),
)?;
b.put_varint(tp.max_udp_payload_size)?;
}
if tp.initial_max_data != 0 {
TransportParams::encode_param(
&mut b,
0x0004,
octets::varint_len(tp.initial_max_data),
)?;
b.put_varint(tp.initial_max_data)?;
}
if tp.initial_max_stream_data_bidi_local != 0 {
TransportParams::encode_param(
&mut b,
0x0005,
octets::varint_len(tp.initial_max_stream_data_bidi_local),
)?;
b.put_varint(tp.initial_max_stream_data_bidi_local)?;
}
if tp.initial_max_stream_data_bidi_remote != 0 {
TransportParams::encode_param(
&mut b,
0x0006,
octets::varint_len(tp.initial_max_stream_data_bidi_remote),
)?;
b.put_varint(tp.initial_max_stream_data_bidi_remote)?;
}
if tp.initial_max_stream_data_uni != 0 {
TransportParams::encode_param(
&mut b,
0x0007,
octets::varint_len(tp.initial_max_stream_data_uni),
)?;
b.put_varint(tp.initial_max_stream_data_uni)?;
}
if tp.initial_max_streams_bidi != 0 {
TransportParams::encode_param(
&mut b,
0x0008,
octets::varint_len(tp.initial_max_streams_bidi),
)?;
b.put_varint(tp.initial_max_streams_bidi)?;
}
if tp.initial_max_streams_uni != 0 {
TransportParams::encode_param(
&mut b,
0x0009,
octets::varint_len(tp.initial_max_streams_uni),
)?;
b.put_varint(tp.initial_max_streams_uni)?;
}
if tp.ack_delay_exponent != 0 {
TransportParams::encode_param(
&mut b,
0x000a,
octets::varint_len(tp.ack_delay_exponent),
)?;
b.put_varint(tp.ack_delay_exponent)?;
}
if tp.max_ack_delay != 0 {
TransportParams::encode_param(
&mut b,
0x000b,
octets::varint_len(tp.max_ack_delay),
)?;
b.put_varint(tp.max_ack_delay)?;
}
if tp.disable_active_migration {
TransportParams::encode_param(&mut b, 0x000c, 0)?;
}
if tp.active_conn_id_limit != 2 {
TransportParams::encode_param(
&mut b,
0x000e,
octets::varint_len(tp.active_conn_id_limit),
)?;
b.put_varint(tp.active_conn_id_limit)?;
}
if let Some(scid) = &tp.initial_source_connection_id {
TransportParams::encode_param(&mut b, 0x000f, scid.len())?;
b.put_bytes(&scid)?;
}
if is_server {
if let Some(scid) = &tp.retry_source_connection_id {
TransportParams::encode_param(&mut b, 0x0010, scid.len())?;
b.put_bytes(&scid)?;
}
}
if let Some(max_datagram_frame_size) = tp.max_datagram_frame_size {
TransportParams::encode_param(
&mut b,
0x0020,
octets::varint_len(max_datagram_frame_size),
)?;
b.put_varint(max_datagram_frame_size)?;
}
let out_len = b.off();
Ok(&mut out[..out_len])
}
#[cfg(feature = "qlog")]
pub fn to_qlog(
&self, owner: qlog::TransportOwner, version: u32, alpn: &[u8],
cipher: Option<crypto::Algorithm>,
) -> qlog::event::Event {
let ocid = qlog::HexSlice::maybe_string(
self.original_destination_connection_id.as_ref(),
);
let stateless_reset_token =
qlog::HexSlice::maybe_string(self.stateless_reset_token.as_ref());
qlog::event::Event::transport_parameters_set(
Some(owner),
None,
None,
String::from_utf8(alpn.to_vec()).ok(),
Some(format!("{:x?}", version)),
Some(format!("{:?}", cipher)),
ocid,
stateless_reset_token,
Some(self.disable_active_migration),
Some(self.max_idle_timeout),
Some(self.max_udp_payload_size),
Some(self.ack_delay_exponent),
Some(self.max_ack_delay),
Some(self.active_conn_id_limit),
Some(self.initial_max_data.to_string()),
Some(self.initial_max_stream_data_bidi_local.to_string()),
Some(self.initial_max_stream_data_bidi_remote.to_string()),
Some(self.initial_max_stream_data_uni.to_string()),
Some(self.initial_max_streams_bidi.to_string()),
Some(self.initial_max_streams_uni.to_string()),
None,
)
}
}
#[doc(hidden)]
pub mod testing {
use super::*;
pub struct Pipe {
pub client: Pin<Box<Connection>>,
pub server: Pin<Box<Connection>>,
}
impl Pipe {
pub fn default() -> Result<Pipe> {
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file("examples/cert.crt")?;
config.load_priv_key_from_pem_file("examples/cert.key")?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.set_max_idle_timeout(180_000);
config.verify_peer(false);
config.set_ack_delay_exponent(5);
Pipe::with_config(&mut config)
}
pub fn with_config(config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let client_scid = ConnectionId::from_ref(&client_scid);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let server_scid = ConnectionId::from_ref(&server_scid);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, config)?,
server: accept(&server_scid, None, config)?,
})
}
pub fn with_client_config(client_config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let client_scid = ConnectionId::from_ref(&client_scid);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let server_scid = ConnectionId::from_ref(&server_scid);
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file("examples/cert.crt")?;
config.load_priv_key_from_pem_file("examples/cert.key")?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, client_config)?,
server: accept(&server_scid, None, &mut config)?,
})
}
pub fn with_server_config(server_config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let client_scid = ConnectionId::from_ref(&client_scid);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let server_scid = ConnectionId::from_ref(&server_scid);
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, &mut config)?,
server: accept(&server_scid, None, server_config)?,
})
}
pub fn handshake(&mut self) -> Result<()> {
while !self.client.is_established() || !self.server.is_established() {
let flight = emit_flight(&mut self.client)?;
process_flight(&mut self.server, flight)?;
let flight = emit_flight(&mut self.server)?;
process_flight(&mut self.client, flight)?;
}
Ok(())
}
pub fn advance(&mut self) -> Result<()> {
let mut client_done = false;
let mut server_done = false;
while !client_done || !server_done {
match emit_flight(&mut self.client) {
Ok(flight) => process_flight(&mut self.server, flight)?,
Err(Error::Done) => client_done = true,
Err(e) => return Err(e),
};
match emit_flight(&mut self.server) {
Ok(flight) => process_flight(&mut self.client, flight)?,
Err(Error::Done) => server_done = true,
Err(e) => return Err(e),
};
}
Ok(())
}
pub fn send_pkt_to_server(
&mut self, pkt_type: packet::Type, frames: &[frame::Frame],
buf: &mut [u8],
) -> Result<usize> {
let written = encode_pkt(&mut self.client, pkt_type, frames, buf)?;
recv_send(&mut self.server, buf, written)
}
}
pub fn recv_send(
conn: &mut Connection, buf: &mut [u8], len: usize,
) -> Result<usize> {
conn.recv(&mut buf[..len])?;
let mut off = 0;
match conn.send(&mut buf[off..]) {
Ok(write) => off += write,
Err(Error::Done) => (),
Err(e) => return Err(e),
}
Ok(off)
}
pub fn process_flight(
conn: &mut Connection, flight: Vec<Vec<u8>>,
) -> Result<()> {
for mut pkt in flight {
conn.recv(&mut pkt)?;
}
Ok(())
}
pub fn emit_flight(conn: &mut Connection) -> Result<Vec<Vec<u8>>> {
let mut flight = Vec::new();
loop {
let mut out = vec![0u8; 65535];
match conn.send(&mut out) {
Ok(written) => out.truncate(written),
Err(Error::Done) => break,
Err(e) => return Err(e),
};
flight.push(out);
}
if flight.is_empty() {
return Err(Error::Done);
}
Ok(flight)
}
pub fn encode_pkt(
conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame],
buf: &mut [u8],
) -> Result<usize> {
let mut b = octets::OctetsMut::with_slice(buf);
let epoch = pkt_type.to_epoch()?;
let space = &mut conn.pkt_num_spaces[epoch];
let pn = space.next_pkt_num;
let pn_len = 4;
let hdr = Header {
ty: pkt_type,
version: conn.version,
dcid: ConnectionId::from_ref(&conn.dcid),
scid: ConnectionId::from_ref(&conn.scid),
pkt_num: 0,
pkt_num_len: pn_len,
token: conn.token.clone(),
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
space.crypto_overhead().unwrap();
if pkt_type != packet::Type::Short {
let len = pn_len + payload_len;
b.put_varint(len as u64)?;
}
b.put_u32(pn as u32)?;
let payload_offset = b.off();
for frame in frames {
frame.to_bytes(&mut b)?;
}
let aead = match space.crypto_seal {
Some(ref v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)?;
space.next_pkt_num += 1;
Ok(written)
}
pub fn decode_pkt(
conn: &mut Connection, buf: &mut [u8], len: usize,
) -> Result<Vec<frame::Frame>> {
let mut b = octets::OctetsMut::with_slice(&mut buf[..len]);
let mut hdr = Header::from_bytes(&mut b, conn.scid.len()).unwrap();
let epoch = hdr.ty.to_epoch()?;
let aead = conn.pkt_num_spaces[epoch].crypto_open.as_ref().unwrap();
let payload_len = b.cap();
packet::decrypt_hdr(&mut b, &mut hdr, &aead).unwrap();
let pn = packet::decode_pkt_num(
conn.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let mut payload =
packet::decrypt_pkt(&mut b, pn, hdr.pkt_num_len, payload_len, aead)
.unwrap();
let mut frames = Vec::new();
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
frames.push(frame);
}
Ok(frames)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn transport_params() {
let tp = TransportParams {
original_destination_connection_id: None,
max_idle_timeout: 30,
stateless_reset_token: Some(vec![0xba; 16]),
max_udp_payload_size: 23_421,
initial_max_data: 424_645_563,
initial_max_stream_data_bidi_local: 154_323_123,
initial_max_stream_data_bidi_remote: 6_587_456,
initial_max_stream_data_uni: 2_461_234,
initial_max_streams_bidi: 12_231,
initial_max_streams_uni: 18_473,
ack_delay_exponent: 20,
max_ack_delay: 2_u64.pow(14) - 1,
disable_active_migration: true,
active_conn_id_limit: 8,
initial_source_connection_id: Some(b"woot woot".to_vec().into()),
retry_source_connection_id: Some(b"retry".to_vec().into()),
max_datagram_frame_size: Some(32),
};
let mut raw_params = [42; 256];
let raw_params =
TransportParams::encode(&tp, true, &mut raw_params).unwrap();
assert_eq!(raw_params.len(), 94);
let new_tp = TransportParams::decode(&raw_params, false).unwrap();
assert_eq!(new_tp, tp);
let tp = TransportParams {
original_destination_connection_id: None,
max_idle_timeout: 30,
stateless_reset_token: None,
max_udp_payload_size: 23_421,
initial_max_data: 424_645_563,
initial_max_stream_data_bidi_local: 154_323_123,
initial_max_stream_data_bidi_remote: 6_587_456,
initial_max_stream_data_uni: 2_461_234,
initial_max_streams_bidi: 12_231,
initial_max_streams_uni: 18_473,
ack_delay_exponent: 20,
max_ack_delay: 2_u64.pow(14) - 1,
disable_active_migration: true,
active_conn_id_limit: 8,
initial_source_connection_id: Some(b"woot woot".to_vec().into()),
retry_source_connection_id: None,
max_datagram_frame_size: Some(32),
};
let mut raw_params = [42; 256];
let raw_params =
TransportParams::encode(&tp, false, &mut raw_params).unwrap();
assert_eq!(raw_params.len(), 69);
let new_tp = TransportParams::decode(&raw_params, true).unwrap();
assert_eq!(new_tp, tp);
}
#[test]
fn unknown_version() {
let mut config = Config::new(0xbabababa).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Err(Error::UnknownVersion));
}
#[test]
fn version_negotiation() {
let mut buf = [0; 65535];
let mut config = Config::new(0xbabababa).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
let hdr = packet::Header::from_slice(&mut buf[..len], 0).unwrap();
len = crate::negotiate_version(&hdr.scid, &hdr.dcid, &mut buf).unwrap();
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.version, PROTOCOL_VERSION);
assert_eq!(pipe.server.version, PROTOCOL_VERSION);
}
#[test]
fn verify_custom_root() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config.verify_peer(true);
config
.load_verify_locations_from_file("examples/rootca.crt")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
}
#[test]
fn missing_initial_source_connection_id() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
pipe.client
.local_transport_params
.initial_source_connection_id = None;
assert_eq!(pipe.client.encode_transport_params(), Ok(()));
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(
pipe.server.recv(&mut buf[..len]),
Err(Error::InvalidTransportParam)
);
}
#[test]
fn invalid_initial_source_connection_id() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
pipe.client
.local_transport_params
.initial_source_connection_id = Some(b"bogus value".to_vec().into());
assert_eq!(pipe.client.encode_transport_params(), Ok(()));
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(
pipe.server.recv(&mut buf[..len]),
Err(Error::InvalidTransportParam)
);
}
#[test]
fn handshake() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(
pipe.client.application_proto(),
pipe.server.application_proto()
);
}
#[test]
fn handshake_done() {
let mut pipe = testing::Pipe::default().unwrap();
pipe.server
.handshake
.lock()
.unwrap()
.set_options(0x0000_4000);
assert_eq!(pipe.handshake(), Ok(()));
assert!(pipe.server.handshake_done_sent);
}
#[test]
fn handshake_confirmation() {
let mut pipe = testing::Pipe::default().unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
testing::process_flight(&mut pipe.server, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.server).unwrap();
assert!(!pipe.client.is_established());
assert!(!pipe.client.handshake_confirmed);
assert!(!pipe.server.is_established());
assert!(!pipe.server.handshake_confirmed);
testing::process_flight(&mut pipe.client, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
assert!(pipe.client.is_established());
assert!(!pipe.client.handshake_confirmed);
assert!(!pipe.server.is_established());
assert!(!pipe.server.handshake_confirmed);
testing::process_flight(&mut pipe.server, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.server).unwrap();
assert!(pipe.client.is_established());
assert!(!pipe.client.handshake_confirmed);
assert!(pipe.server.is_established());
assert!(!pipe.server.handshake_confirmed);
testing::process_flight(&mut pipe.client, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
assert!(pipe.client.is_established());
assert!(pipe.client.handshake_confirmed);
assert!(pipe.server.is_established());
assert!(!pipe.server.handshake_confirmed);
testing::process_flight(&mut pipe.server, flight).unwrap();
assert!(pipe.client.is_established());
assert!(pipe.client.handshake_confirmed);
assert!(pipe.server.is_established());
assert!(pipe.server.handshake_confirmed);
}
#[test]
fn handshake_alpn_mismatch() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto3\x06proto4")
.unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Err(Error::TlsFail));
assert_eq!(pipe.client.application_proto(), b"");
assert_eq!(pipe.server.application_proto(), b"");
}
#[test]
fn limit_handshake_data() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert-big.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
let client_sent = flight.iter().fold(0, |out, p| out + p.len());
testing::process_flight(&mut pipe.server, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.server).unwrap();
let server_sent = flight.iter().fold(0, |out, p| out + p.len());
assert_eq!(server_sent, (client_sent - 2) * MAX_AMPLIFICATION_FACTOR);
}
#[test]
fn stream() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
assert!(!pipe.server.stream_finished(4));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((12, true)));
assert_eq!(&b[..12], b"hello, world");
assert!(pipe.server.stream_finished(4));
}
#[test]
fn stream_send_on_32bit_arch() {
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(2_u64.pow(32) + 5);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(0);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
assert!(!pipe.server.stream_finished(4));
}
#[test]
fn empty_stream_frame() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
}];
let pkt_type = packet::Type::Short;
assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
let mut readable = pipe.server.readable();
assert_eq!(readable.next(), Some(4));
assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((5, false)));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"", 5, true),
}];
let pkt_type = packet::Type::Short;
assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
let mut readable = pipe.server.readable();
assert_eq!(readable.next(), Some(4));
assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((0, true)));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"", 15, true),
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FinalSize)
);
}
#[test]
fn max_stream_data_receive_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(2, b"hello", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let frames = [frame::Frame::MaxStreamData {
stream_id: 2,
max: 1024,
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::InvalidStreamState),
);
}
#[test]
fn empty_payload() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &[], &mut buf),
Err(Error::InvalidPacket)
);
}
#[test]
fn min_payload() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let frames = [frame::Frame::Padding { len: 4 }];
let pkt_type = packet::Type::Initial;
let written =
testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
assert_eq!(pipe.server.max_send_bytes, 195);
pipe.server.recovery.loss_probes[packet::EPOCH_INITIAL] = 1;
pipe.server.max_send_bytes = 60;
assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
}
#[test]
fn flow_control_limit() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn flow_control_limit_dup() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
}
#[test]
fn flow_control_update() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
pipe.server.stream_recv(4, &mut buf).unwrap();
pipe.server.stream_recv(8, &mut buf).unwrap();
let frames = [frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 1, false),
}];
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(
iter.next(),
Some(&frame::Frame::MaxStreamData {
stream_id: 4,
max: 30
})
);
assert_eq!(iter.next(), Some(&frame::Frame::MaxData { max: 46 }));
}
#[test]
fn flow_control_drain() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.stream_send(4, b"aaaaa", true), Ok(5));
assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.stream_send(8, b"aaaaa", true), Ok(5));
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.advance(), Ok(()));
}
#[test]
fn stream_flow_control_limit_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaaa", 0, true),
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn stream_flow_control_limit_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 2,
data: stream::RangeBuf::from(b"aaaaaaaaaaa", 0, true),
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn stream_flow_control_update() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaa", 0, false),
}];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
pipe.server.stream_recv(4, &mut buf).unwrap();
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"a", 7, false),
}];
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(
iter.next(),
Some(&frame::Frame::MaxStreamData {
stream_id: 4,
max: 22,
})
);
}
#[test]
fn stream_left_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(3, pipe.client.peer_streams_left_bidi());
assert_eq!(3, pipe.server.peer_streams_left_bidi());
pipe.server.stream_send(1, b"a", false).ok();
assert_eq!(2, pipe.server.peer_streams_left_bidi());
pipe.server.stream_send(5, b"a", false).ok();
assert_eq!(1, pipe.server.peer_streams_left_bidi());
pipe.server.stream_send(9, b"a", false).ok();
assert_eq!(0, pipe.server.peer_streams_left_bidi());
let frames = [frame::Frame::MaxStreamsBidi { max: MAX_STREAM_ID }];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
assert_eq!(MAX_STREAM_ID - 3, pipe.server.peer_streams_left_bidi());
}
#[test]
fn stream_left_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(3, pipe.client.peer_streams_left_uni());
assert_eq!(3, pipe.server.peer_streams_left_uni());
pipe.server.stream_send(3, b"a", false).ok();
assert_eq!(2, pipe.server.peer_streams_left_uni());
pipe.server.stream_send(7, b"a", false).ok();
assert_eq!(1, pipe.server.peer_streams_left_uni());
pipe.server.stream_send(11, b"a", false).ok();
assert_eq!(0, pipe.server.peer_streams_left_uni());
let frames = [frame::Frame::MaxStreamsUni { max: MAX_STREAM_ID }];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
assert_eq!(MAX_STREAM_ID - 3, pipe.server.peer_streams_left_uni());
}
#[test]
fn stream_limit_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 16,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 20,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 24,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 28,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_limit_max_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::MaxStreamsBidi { max: MAX_STREAM_ID }];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::MaxStreamsBidi {
max: MAX_STREAM_ID + 1,
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::InvalidFrame),
);
}
#[test]
fn stream_limit_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 2,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 6,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 10,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 14,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 18,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 22,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 26,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_limit_max_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::MaxStreamsUni { max: MAX_STREAM_ID }];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::MaxStreamsUni {
max: MAX_STREAM_ID + 1,
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::InvalidFrame),
);
}
#[test]
fn streams_blocked_max_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::StreamsBlockedBidi {
limit: MAX_STREAM_ID,
}];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::StreamsBlockedBidi {
limit: MAX_STREAM_ID + 1,
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::InvalidFrame),
);
}
#[test]
fn streams_blocked_max_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::StreamsBlockedUni {
limit: MAX_STREAM_ID,
}];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::StreamsBlockedUni {
limit: MAX_STREAM_ID + 1,
}];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::InvalidFrame),
);
}
#[test]
fn stream_data_overlap() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"bbbbb", 3, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"ccccc", 6, false),
},
];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
assert_eq!(&b[..11], b"aaaaabbbccc");
}
#[test]
fn stream_data_overlap_with_reordering() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"ccccc", 6, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"bbbbb", 3, false),
},
];
let pkt_type = packet::Type::Short;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
assert_eq!(&b[..11], b"aaaaabccccc");
}
#[test]
fn reset_stream_flow_control() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::ResetStream {
stream_id: 8,
error_code: 0,
final_size: 15,
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Short;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn path_challenge() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::PathChallenge {
data: vec![0xba; 8],
}];
let pkt_type = packet::Type::Short;
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(
iter.next(),
Some(&frame::Frame::PathResponse {
data: vec![0xba; 8],
})
);
}
#[test]
fn early_1rtt_packet() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
testing::process_flight(&mut pipe.server, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.server).unwrap();
testing::process_flight(&mut pipe.client, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.client).unwrap();
let delayed = flight.clone();
testing::emit_flight(&mut pipe.server).ok();
assert!(pipe.client.is_established());
let frames = [frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"hello, world", 0, true),
}];
let pkt_type = packet::Type::Short;
let written =
testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"hello, world", 0, true),
}];
let written =
testing::encode_pkt(&mut pipe.client, pkt_type, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
assert!(!pipe.server.is_established());
assert_eq!(
pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
.largest_rx_pkt_num,
0
);
testing::process_flight(&mut pipe.server, delayed).unwrap();
assert!(pipe.server.is_established());
assert_eq!(
pipe.server.pkt_num_spaces[packet::EPOCH_APPLICATION]
.largest_rx_pkt_num,
0
);
}
#[test]
fn stop_sending() {
let mut b = [0; 15];
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello", true), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((5, true)));
assert!(pipe.server.stream_finished(4));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
let mut r = pipe.server.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
loop {
if pipe.server.stream_send(4, b"world", false) == Ok(0) {
break;
}
assert_eq!(pipe.advance(), Ok(()));
}
let mut r = pipe.server.writable();
assert_eq!(r.next(), None);
let frames = [frame::Frame::StopSending {
stream_id: 4,
error_code: 42,
}];
let pkt_type = packet::Type::Short;
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next();
assert_eq!(
iter.next(),
Some(&frame::Frame::ResetStream {
stream_id: 4,
error_code: 42,
final_size: 15,
})
);
let mut r = pipe.server.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(
pipe.server.stream_send(4, b"world", true),
Err(Error::StreamStopped(42)),
);
assert_eq!(pipe.server.streams.len(), 1);
let mut ranges = ranges::RangeSet::default();
ranges.insert(0..6);
let frames = [frame::Frame::ACK {
ack_delay: 15,
ranges,
}];
assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(0));
assert_eq!(pipe.server.streams.len(), 0);
let frames = [frame::Frame::StopSending {
stream_id: 4,
error_code: 42,
}];
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(frames.len(), 1);
match frames.iter().next() {
Some(frame::Frame::ACK { .. }) => (),
f => panic!("expected ACK frame, got {:?}", f),
};
let mut r = pipe.server.writable();
assert_eq!(r.next(), None);
}
#[test]
fn stop_sending_fin() {
let mut b = [0; 15];
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello", true), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((5, true)));
assert!(pipe.server.stream_finished(4));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
let mut r = pipe.server.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_send(4, b"world", true), Ok(5));
let frames = [frame::Frame::StopSending {
stream_id: 4,
error_code: 42,
}];
let pkt_type = packet::Type::Short;
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next();
assert_eq!(
iter.next(),
Some(&frame::Frame::ResetStream {
stream_id: 4,
error_code: 42,
final_size: 5,
})
);
assert_eq!(iter.next(), None);
}
#[test]
fn stream_shutdown_read() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 1);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
let len = pipe.server.send(&mut buf).unwrap();
let mut dummy = buf[..len].to_vec();
let frames =
testing::decode_pkt(&mut pipe.client, &mut dummy, len).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&frame::Frame::StopSending {
stream_id: 4,
error_code: 42,
})
);
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(
pipe.client.stream_send(4, b"bye", false),
Err(Error::StreamStopped(42))
);
assert_eq!(pipe.server.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_recv(4, &mut buf), Ok((12, true)));
assert_eq!(pipe.client.streams.len(), 0);
assert_eq!(pipe.server.streams.len(), 0);
assert_eq!(
pipe.server.stream_shutdown(4, Shutdown::Read, 0),
Err(Error::Done)
);
}
#[test]
fn stream_shutdown_read_after_fin() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 1);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_recv(4, &mut buf), Ok((12, true)));
assert_eq!(pipe.client.streams.len(), 0);
assert_eq!(pipe.server.streams.len(), 0);
assert_eq!(
pipe.server.stream_shutdown(4, Shutdown::Read, 0),
Err(Error::Done)
);
}
#[test]
fn stream_shutdown_write() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
let mut r = pipe.server.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 1);
assert_eq!(pipe.server.stream_send(4, b"goodbye, world", false), Ok(14));
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Write, 42), Ok(()));
let mut r = pipe.server.writable();
assert_eq!(r.next(), None);
let len = pipe.server.send(&mut buf).unwrap();
let mut dummy = buf[..len].to_vec();
let frames =
testing::decode_pkt(&mut pipe.client, &mut dummy, len).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&frame::Frame::ResetStream {
stream_id: 4,
error_code: 42,
final_size: 14,
})
);
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.server.stream_send(4, b"bye", false),
Err(Error::FinalSize)
);
assert_eq!(pipe.client.stream_send(4, b"bye", true), Ok(3));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((15, true)));
assert_eq!(pipe.server.streams.len(), 0);
assert_eq!(
pipe.server.stream_shutdown(4, Shutdown::Write, 0),
Err(Error::Done)
);
}
#[test]
fn stream_round_robin() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
let len = pipe.client.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next();
assert_eq!(
iter.next(),
Some(&frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
})
);
let len = pipe.client.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
})
);
let len = pipe.client.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
})
);
}
#[test]
fn stream_readable() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
let mut r = pipe.client.readable();
assert_eq!(r.next(), None);
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(
pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
let mut b = [0; 15];
pipe.client.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), None);
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.len(), 2);
assert!(r.next().is_some());
assert!(r.next().is_some());
assert!(r.next().is_none());
assert_eq!(r.len(), 0);
}
#[test]
fn stream_writable() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let mut w = pipe.client.writable();
assert_eq!(w.next(), None);
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
let mut w = pipe.client.writable();
assert_eq!(w.next(), Some(4));
assert_eq!(w.next(), None);
assert_eq!(pipe.advance(), Ok(()));
let mut w = pipe.server.writable();
assert_eq!(w.next(), Some(4));
assert_eq!(w.next(), None);
assert_eq!(
pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
let mut w = pipe.server.writable();
assert_eq!(w.next(), None);
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.client.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let mut w = pipe.server.writable();
assert_eq!(w.next(), Some(4));
assert_eq!(w.next(), None);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
let mut w = pipe.server.writable();
assert_eq!(w.next(), None);
assert_eq!(pipe.client.stream_send(8, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
let mut w = pipe.server.writable();
assert_eq!(w.len(), 2);
assert!(w.next().is_some());
assert!(w.next().is_some());
assert!(w.next().is_none());
assert_eq!(w.len(), 0);
assert_eq!(pipe.server.stream_send(12, b"aaaaa", true), Ok(5));
let mut w = pipe.server.writable();
assert_eq!(w.next(), Some(8));
assert_eq!(w.next(), None);
}
#[test]
fn flow_control_limit_send() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(
pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.client.stream_send(4, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert!(r.next().is_some());
assert!(r.next().is_some());
assert!(r.next().is_none());
}
#[test]
fn invalid_initial_server() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let frames = [frame::Frame::Padding { len: 10 }];
let written = testing::encode_pkt(
&mut pipe.client,
packet::Type::Initial,
&frames,
&mut buf,
)
.unwrap();
buf[written - 1] = !buf[written - 1];
assert_eq!(pipe.server.timeout(), None);
assert_eq!(
pipe.server.recv(&mut buf[..written]),
Err(Error::CryptoFail)
);
assert!(pipe.server.is_closed());
}
#[test]
fn invalid_initial_client() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(1200));
let frames = [frame::Frame::Padding { len: 10 }];
let written = testing::encode_pkt(
&mut pipe.server,
packet::Type::Initial,
&frames,
&mut buf,
)
.unwrap();
buf[written - 1] = !buf[written - 1];
assert_eq!(pipe.client.recv(&mut buf[..written]), Ok(71));
assert_eq!(pipe.client.is_closed(), false);
assert!(pipe.client.idle_timer.is_some());
}
#[test]
fn invalid_initial_payload() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let mut b = octets::OctetsMut::with_slice(&mut buf);
let epoch = packet::Type::Initial.to_epoch().unwrap();
let pn = 0;
let pn_len = packet::pkt_num_len(pn).unwrap();
let hdr = Header {
ty: packet::Type::Initial,
version: pipe.client.version,
dcid: ConnectionId::from_ref(&pipe.client.dcid),
scid: ConnectionId::from_ref(&pipe.client.scid),
pkt_num: 0,
pkt_num_len: pn_len,
token: pipe.client.token.clone(),
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b).unwrap();
let payload_len = 4096;
let len = pn_len + payload_len;
b.put_varint(len as u64).unwrap();
packet::encode_pkt_num(pn, &mut b).unwrap();
let payload_offset = b.off();
let frames = [frame::Frame::Padding { len: 10 }];
for frame in &frames {
frame.to_bytes(&mut b).unwrap();
}
let space = &mut pipe.client.pkt_num_spaces[epoch];
let payload_len = frames.iter().fold(0, |acc, x| acc + x.wire_len()) +
space.crypto_overhead().unwrap();
let aead = space.crypto_seal.as_ref().unwrap();
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)
.unwrap();
assert_eq!(pipe.server.timeout(), None);
assert_eq!(
pipe.server.recv(&mut buf[..written]),
Err(Error::BufferTooShort)
);
assert!(pipe.server.is_closed());
}
#[test]
fn invalid_packet() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let frames = [frame::Frame::Padding { len: 10 }];
let written = testing::encode_pkt(
&mut pipe.client,
packet::Type::Short,
&frames,
&mut buf,
)
.unwrap();
buf[written - 1] = !buf[written - 1];
assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
buf[0] = 255;
assert_eq!(pipe.server.recv(&mut buf[..written]), Ok(written));
}
#[test]
fn recv_empty_buffer() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.server.recv(&mut buf[..0]), Err(Error::BufferTooShort));
}
#[test]
fn stream_limit_update_bidi() {
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(0);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
pipe.server.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(0, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(4, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(4, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(0, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.client.stream_send(20, b"a", false),
Err(Error::StreamLimit)
);
assert_eq!(pipe.server.readable().len(), 3);
}
#[test]
fn stream_limit_update_uni() {
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(0);
config.set_initial_max_streams_uni(3);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(2, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(6, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(6, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(2, b"b", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(2, &mut b).unwrap();
pipe.server.stream_recv(6, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(10, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(14, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(18, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.client.stream_send(22, b"a", false),
Err(Error::StreamLimit)
);
assert_eq!(pipe.server.readable().len(), 3);
}
#[test]
fn stream_zero_length_fin() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(
pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(0));
assert!(r.next().is_none());
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(0));
assert!(r.next().is_none());
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
}
#[test]
fn stream_zero_length_fin_deferred_collection() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(
pipe.client.stream_send(0, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(0));
assert!(r.next().is_none());
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(0, b"", true), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(0));
assert!(r.next().is_none());
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"", true), Ok(0));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
let mut r = pipe.client.readable();
assert_eq!(r.next(), Some(0));
pipe.client.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.client.readable();
assert_eq!(r.next(), None);
}
#[test]
fn collect_streams() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.streams.len(), 0);
assert_eq!(pipe.server.streams.len(), 0);
assert_eq!(pipe.client.stream_send(0, b"aaaaa", true), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
assert!(!pipe.client.stream_finished(0));
assert!(!pipe.server.stream_finished(0));
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 1);
let mut b = [0; 5];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.stream_send(0, b"aaaaa", true), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
assert!(!pipe.client.stream_finished(0));
assert!(pipe.server.stream_finished(0));
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 0);
let mut b = [0; 5];
pipe.client.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.streams.len(), 0);
assert_eq!(pipe.server.streams.len(), 0);
assert!(pipe.client.stream_finished(0));
assert!(pipe.server.stream_finished(0));
assert_eq!(pipe.client.stream_send(0, b"", true), Err(Error::Done));
let frames = [frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aa", 0, false),
}];
let pkt_type = packet::Type::Short;
assert_eq!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf), Ok(39));
}
#[test]
fn config_set_cc_algorithm_name() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
assert_eq!(config.set_cc_algorithm_name("reno"), Ok(()));
assert_eq!(
config.set_cc_algorithm_name("???"),
Err(Error::CongestionControl)
);
}
#[test]
fn peer_cert() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
match pipe.client.peer_cert() {
Some(c) => assert_eq!(c.len(), 753),
None => panic!("missing server certificate"),
}
}
#[test]
fn retry() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
let odcid = hdr.dcid.clone();
let mut scid = [0; MAX_CONN_ID_LEN];
rand::rand_bytes(&mut scid[..]);
let scid = ConnectionId::from_ref(&scid);
let token = b"quiche test retry token";
len = packet::retry(
&hdr.scid,
&hdr.dcid,
&scid,
token,
hdr.version,
&mut buf,
)
.unwrap();
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
len = pipe.client.send(&mut buf).unwrap();
let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
assert_eq!(&hdr.token.unwrap(), token);
pipe.server = accept(&scid, Some(&odcid), &mut config).unwrap();
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
assert_eq!(pipe.advance(), Ok(()));
assert!(pipe.client.is_established());
assert!(pipe.server.is_established());
}
#[test]
fn missing_retry_source_connection_id() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
let mut scid = [0; MAX_CONN_ID_LEN];
rand::rand_bytes(&mut scid[..]);
let scid = ConnectionId::from_ref(&scid);
let token = b"quiche test retry token";
len = packet::retry(
&hdr.scid,
&hdr.dcid,
&scid,
token,
hdr.version,
&mut buf,
)
.unwrap();
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
len = pipe.client.send(&mut buf).unwrap();
pipe.server = accept(&scid, None, &mut config).unwrap();
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let flight = testing::emit_flight(&mut pipe.server).unwrap();
assert_eq!(
testing::process_flight(&mut pipe.client, flight),
Err(Error::InvalidTransportParam)
);
}
#[test]
fn invalid_retry_source_connection_id() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
let hdr = Header::from_slice(&mut buf[..len], MAX_CONN_ID_LEN).unwrap();
let mut scid = [0; MAX_CONN_ID_LEN];
rand::rand_bytes(&mut scid[..]);
let scid = ConnectionId::from_ref(&scid);
let token = b"quiche test retry token";
len = packet::retry(
&hdr.scid,
&hdr.dcid,
&scid,
token,
hdr.version,
&mut buf,
)
.unwrap();
assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));
len = pipe.client.send(&mut buf).unwrap();
let odcid = ConnectionId::from_ref(b"bogus value");
pipe.server = accept(&scid, Some(&odcid), &mut config).unwrap();
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let flight = testing::emit_flight(&mut pipe.server).unwrap();
assert_eq!(
testing::process_flight(&mut pipe.client, flight),
Err(Error::InvalidTransportParam)
);
}
fn check_send(_: &mut impl Send) {}
#[test]
fn config_must_be_send() {
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
check_send(&mut config);
}
#[test]
fn connection_must_be_send() {
let mut pipe = testing::Pipe::default().unwrap();
check_send(&mut pipe.client);
}
fn check_sync(_: &mut impl Sync) {}
#[test]
fn config_must_be_sync() {
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
check_sync(&mut config);
}
#[test]
fn connection_must_be_sync() {
let mut pipe = testing::Pipe::default().unwrap();
check_sync(&mut pipe.client);
}
#[test]
fn data_blocked() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"aaaaaaaaaa", false), Ok(10));
assert_eq!(pipe.client.blocked_limit, None);
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"aaaaaaaaaa", false), Ok(10));
assert_eq!(pipe.client.blocked_limit, None);
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"aaaaaaaaaaa", false), Ok(10));
assert_eq!(pipe.client.blocked_limit, Some(30));
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.client.blocked_limit, None);
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
assert_eq!(iter.next(), Some(&frame::Frame::DataBlocked { limit: 30 }));
assert_eq!(
iter.next(),
Some(&frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"aaaaaaaaaa", 0, false),
})
);
assert_eq!(iter.next(), None);
}
#[test]
fn stream_data_blocked() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.streams.blocked().len(), 0);
assert_eq!(pipe.client.stream_send(0, b"aaaaa", false), Ok(5));
assert_eq!(pipe.client.streams.blocked().len(), 0);
assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(5));
assert_eq!(pipe.client.streams.blocked().len(), 1);
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.client.streams.blocked().len(), 0);
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next();
assert_eq!(
iter.next(),
Some(&frame::Frame::StreamDataBlocked {
stream_id: 0,
limit: 15,
})
);
assert_eq!(
iter.next(),
Some(&frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
})
);
assert_eq!(iter.next(), None);
assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.client.streams.blocked().len(), 0);
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"a", 0, false),
})
);
assert_eq!(iter.next(), None);
assert_eq!(pipe.client.stream_send(0, b"aaaaaa", false), Ok(0));
assert_eq!(pipe.client.streams.blocked().len(), 1);
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.client.streams.blocked().len(), 0);
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
assert_eq!(
iter.next(),
Some(&frame::Frame::StreamDataBlocked {
stream_id: 0,
limit: 15,
})
);
assert_eq!(iter.next(), Some(&frame::Frame::Padding { len: 1 }));
assert_eq!(iter.next(), None);
}
#[test]
fn app_limited_true() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(50000);
config.set_initial_max_stream_data_bidi_local(50000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_max_recv_udp_payload_size(1200);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let send_buf = [0; 10000];
assert_eq!(pipe.server.stream_send(0, &send_buf, false), Ok(10000));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.server.recovery.app_limited(), true);
}
#[test]
fn app_limited_false() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(50000);
config.set_initial_max_stream_data_bidi_local(50000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_max_recv_udp_payload_size(1200);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let send_buf1 = [0; 20000];
assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(12000));
testing::emit_flight(&mut pipe.server).ok();
assert_eq!(pipe.server.recovery.app_limited(), false);
}
#[test]
fn app_limited_false_no_frame() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(50000);
config.set_initial_max_stream_data_bidi_local(50000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_max_recv_udp_payload_size(1405);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let send_buf1 = [0; 20000];
assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(12000));
testing::emit_flight(&mut pipe.server).ok();
assert_eq!(pipe.server.recovery.app_limited(), false);
}
#[test]
fn app_limited_false_no_header() {
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(50000);
config.set_initial_max_stream_data_bidi_local(50000);
config.set_initial_max_stream_data_bidi_remote(50000);
config.set_max_recv_udp_payload_size(1406);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", true), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 15];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
let send_buf1 = [0; 20000];
assert_eq!(pipe.server.stream_send(0, &send_buf1, false), Ok(12000));
testing::emit_flight(&mut pipe.server).ok();
assert_eq!(pipe.server.recovery.app_limited(), false);
}
#[test]
fn limit_ack_ranges() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
let epoch = packet::EPOCH_APPLICATION;
assert_eq!(pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(), 0);
let frames = [frame::Frame::Ping, frame::Frame::Padding { len: 3 }];
let pkt_type = packet::Type::Short;
let mut last_packet_sent = 0;
for _ in 0..512 {
let recv_count = pipe.server.recv_count;
last_packet_sent = pipe.client.pkt_num_spaces[epoch].next_pkt_num;
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server.recv_count, recv_count + 1);
pipe.client.pkt_num_spaces[epoch].next_pkt_num += 1;
}
assert_eq!(
pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.len(),
MAX_ACK_RANGES
);
assert_eq!(
pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.first(),
Some(last_packet_sent - ((MAX_ACK_RANGES as u64) - 1) * 2)
);
assert_eq!(
pipe.server.pkt_num_spaces[epoch].recv_pkt_need_ack.last(),
Some(last_packet_sent)
);
}
#[test]
fn stream_priority() {
const MAX_TEST_PACKET_SIZE: usize = 540;
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(1_000_000);
config.set_initial_max_stream_data_bidi_local(1_000_000);
config.set_initial_max_stream_data_bidi_remote(1_000_000);
config.set_initial_max_stream_data_uni(0);
config.set_initial_max_streams_bidi(100);
config.set_initial_max_streams_uni(0);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(16, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(20, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 1];
let out = [b'b'; 500];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
pipe.server.stream_send(0, &out, false).unwrap();
pipe.server.stream_send(0, &out, false).unwrap();
pipe.server.stream_send(0, &out, false).unwrap();
pipe.server.stream_recv(12, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
pipe.server.stream_send(12, &out, false).unwrap();
pipe.server.stream_send(12, &out, false).unwrap();
pipe.server.stream_send(12, &out, false).unwrap();
pipe.server.stream_recv(16, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(16, 10, false), Ok(()));
pipe.server.stream_send(16, &out, false).unwrap();
pipe.server.stream_send(16, &out, false).unwrap();
pipe.server.stream_send(16, &out, false).unwrap();
pipe.server.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
pipe.server.stream_send(4, &out, false).unwrap();
pipe.server.stream_send(4, &out, false).unwrap();
pipe.server.stream_send(4, &out, false).unwrap();
pipe.server.stream_recv(8, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(8, 10, false), Ok(()));
pipe.server.stream_send(8, &out, false).unwrap();
pipe.server.stream_send(8, &out, false).unwrap();
pipe.server.stream_send(8, &out, false).unwrap();
pipe.server.stream_recv(20, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(20, 42, false), Ok(()));
pipe.server.stream_send(20, &out, false).unwrap();
pipe.server.stream_send(20, &out, false).unwrap();
pipe.server.stream_send(20, &out, false).unwrap();
let mut off = 0;
for _ in 1..=3 {
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let stream = frames.iter().next().unwrap();
assert_eq!(stream, &frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(&out, off, false),
});
off = match stream {
frame::Frame::Stream { data, .. } => data.max_off(),
_ => unreachable!(),
};
}
let mut off = 0;
for _ in 1..=3 {
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let stream = frames.iter().next().unwrap();
assert_eq!(stream, &frame::Frame::Stream {
stream_id: 16,
data: stream::RangeBuf::from(&out, off, false),
});
off = match stream {
frame::Frame::Stream { data, .. } => data.max_off(),
_ => unreachable!(),
};
}
let mut off = 0;
for _ in 1..=3 {
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let stream = frames.iter().next().unwrap();
assert_eq!(stream, &frame::Frame::Stream {
stream_id: 20,
data: stream::RangeBuf::from(&out, off, false),
});
off = match stream {
frame::Frame::Stream { data, .. } => data.max_off(),
_ => unreachable!(),
};
}
let mut off = 0;
for _ in 1..=3 {
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(&out, off, false),
})
);
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let stream = frames.iter().next().unwrap();
assert_eq!(stream, &frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(&out, off, false),
});
off = match stream {
frame::Frame::Stream { data, .. } => data.max_off(),
_ => unreachable!(),
};
}
let mut off = 0;
for _ in 1..=3 {
let len = pipe.server.send(&mut buf[..MAX_TEST_PACKET_SIZE]).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let stream = frames.iter().next().unwrap();
assert_eq!(stream, &frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(&out, off, false),
});
off = match stream {
frame::Frame::Stream { data, .. } => data.max_off(),
_ => unreachable!(),
};
}
assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
}
#[test]
#[should_panic]
fn stream_reprioritize() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(0);
config.set_initial_max_streams_bidi(5);
config.set_initial_max_streams_uni(0);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
let mut b = [0; 1];
pipe.server.stream_recv(0, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(0, 255, true), Ok(()));
pipe.server.stream_send(0, b"b", false).unwrap();
pipe.server.stream_recv(12, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(12, 42, true), Ok(()));
pipe.server.stream_send(12, b"b", false).unwrap();
pipe.server.stream_recv(8, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(8, 10, true), Ok(()));
pipe.server.stream_send(8, b"b", false).unwrap();
pipe.server.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.server.stream_priority(4, 42, true), Ok(()));
pipe.server.stream_send(4, b"b", false).unwrap();
assert_eq!(pipe.server.stream_priority(0, 20, true), Ok(()));
let len = pipe.server.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"b", 0, false),
})
);
let len = pipe.server.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"b", 0, false),
})
);
let len = pipe.server.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"b", 0, false),
})
);
let len = pipe.server.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"b", 0, false),
})
);
assert_eq!(pipe.server.send(&mut buf), Err(Error::Done));
}
#[test]
fn early_retransmit() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"a", false), Ok(1));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"b", false), Ok(1));
assert!(pipe.client.send(&mut buf).is_ok());
let timer = pipe.client.timeout().unwrap();
std::thread::sleep(timer + time::Duration::from_millis(1));
pipe.client.on_timeout();
let epoch = packet::EPOCH_APPLICATION;
assert_eq!(pipe.client.recovery.loss_probes[epoch], 1);
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(pipe.client.recovery.loss_probes[epoch], 0);
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next();
assert_eq!(
iter.next(),
Some(&frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"b", 0, false),
})
);
}
#[test]
fn handshake_anti_deadlock() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert-big.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
assert_eq!(pipe.server.handshake_status().has_handshake_keys, false);
assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(len, 1200);
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let flight = testing::emit_flight(&mut pipe.server).unwrap();
assert_eq!(pipe.client.handshake_status().has_handshake_keys, false);
assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
testing::process_flight(&mut pipe.client, flight).unwrap();
testing::emit_flight(&mut pipe.client).unwrap();
assert_eq!(pipe.client.handshake_status().has_handshake_keys, true);
assert_eq!(pipe.client.handshake_status().peer_verified_address, false);
assert_eq!(pipe.server.handshake_status().has_handshake_keys, true);
assert_eq!(pipe.server.handshake_status().peer_verified_address, true);
assert!(pipe.client.timeout().is_some());
}
#[test]
fn handshake_packet_type_corruption() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let len = pipe.client.send(&mut buf).unwrap();
assert_eq!(len, 1200);
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let flight = testing::emit_flight(&mut pipe.server).unwrap();
testing::process_flight(&mut pipe.client, flight).unwrap();
let (ty, len) = pipe.client.send_single(&mut buf).unwrap();
assert_eq!(ty, Type::Initial);
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let (ty, len) = pipe.client.send_single(&mut buf).unwrap();
assert_eq!(ty, Type::Handshake);
buf[0] &= !(0x20);
let hdr = Header::from_slice(&mut buf[..len], 0).unwrap();
assert_eq!(hdr.ty, Type::Initial);
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
}
#[test]
fn dgram_send_fails_invalidstate() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(
pipe.client.dgram_send(b"hello, world"),
Err(Error::InvalidState)
);
}
#[test]
fn dgram_send_app_limited() {
let mut buf = [0; 65535];
let send_buf = [0xcf; 1000];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 1000, 1000);
config.set_max_recv_udp_payload_size(1200);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
for _ in 0..1000 {
assert_eq!(pipe.client.dgram_send(&send_buf), Ok(()));
}
assert!(!pipe.client.recovery.app_limited());
assert_eq!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
let len = pipe.client.send(&mut buf).unwrap();
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
assert!(!pipe.client.recovery.app_limited());
assert_eq!(pipe.server.recv(&mut buf[..len]), Ok(len));
let flight = testing::emit_flight(&mut pipe.client).unwrap();
testing::process_flight(&mut pipe.server, flight).unwrap();
let flight = testing::emit_flight(&mut pipe.server).unwrap();
testing::process_flight(&mut pipe.client, flight).unwrap();
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 0);
assert_ne!(pipe.client.dgram_send_queue.byte_size(), 1_000_000);
assert!(!pipe.client.recovery.app_limited());
}
#[test]
fn dgram_single_datagram() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 10, 10);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
let result1 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result1, Ok(12));
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Err(Error::Done));
}
#[test]
fn dgram_multiple_datagrams() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 10, 10);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
pipe.client
.dgram_purge_outgoing(|d: &[u8]| -> bool { d[0] == b'c' });
assert_eq!(pipe.advance(), Ok(()));
let result1 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result1, Ok(12));
assert_eq!(buf[0], b'h');
assert_eq!(buf[1], b'e');
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Ok(11));
assert_eq!(buf[0], b'h');
assert_eq!(buf[1], b'o');
let result3 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result3, Err(Error::Done));
}
#[test]
fn dgram_send_queue_overflow() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 10, 2);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Err(Error::Done));
assert_eq!(pipe.advance(), Ok(()));
let result1 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result1, Ok(12));
assert_eq!(buf[0], b'h');
assert_eq!(buf[1], b'e');
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Ok(11));
assert_eq!(buf[0], b'c');
assert_eq!(buf[1], b'i');
let result3 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result3, Err(Error::Done));
}
#[test]
fn dgram_recv_queue_overflow() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 2, 10);
config.set_max_recv_udp_payload_size(1200);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hello, world"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"ciao, mondo"), Ok(()));
assert_eq!(pipe.client.dgram_send(b"hola, mundo"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
let result1 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result1, Ok(11));
assert_eq!(buf[0], b'c');
assert_eq!(buf[1], b'i');
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Ok(11));
assert_eq!(buf[0], b'h');
assert_eq!(buf[1], b'o');
let result3 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result3, Err(Error::Done));
}
#[test]
fn dgram_send_max_size() {
let mut buf = [0; MAX_DGRAM_FRAME_SIZE as usize];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 10, 10);
config.set_max_recv_udp_payload_size(1452);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.client.dgram_max_writable_len(), None);
assert_eq!(pipe.handshake(), Ok(()));
let max_dgram_size = pipe.client.dgram_max_writable_len().unwrap();
assert_eq!(max_dgram_size, 1160);
let dgram_packet: Vec<u8> = vec![42; max_dgram_size];
assert_eq!(pipe.client.dgram_send(&dgram_packet), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
let result1 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result1, Ok(max_dgram_size));
let result2 = pipe.server.dgram_recv(&mut buf);
assert_eq!(result2, Err(Error::Done));
}
#[test]
fn is_readable() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_stream_data_uni(10);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.enable_dgram(true, 10, 10);
config.set_max_recv_udp_payload_size(1452);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.is_readable(), false);
assert_eq!(pipe.server.is_readable(), false);
assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.is_readable(), false);
assert_eq!(pipe.server.is_readable(), true);
assert_eq!(
pipe.server.stream_send(4, b"aaaaaaaaaaaaaaa", false),
Ok(15)
);
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.is_readable(), true);
assert_eq!(pipe.server.is_readable(), true);
let mut b = [0; 15];
pipe.client.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.is_readable(), false);
assert_eq!(pipe.server.is_readable(), true);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
assert_eq!(pipe.server.is_readable(), false);
assert_eq!(pipe.client.dgram_send(b"dddddddddddddd"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.is_readable(), false);
assert_eq!(pipe.server.is_readable(), true);
assert_eq!(pipe.server.dgram_send(b"dddddddddddddd"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.is_readable(), true);
assert_eq!(pipe.server.is_readable(), true);
let r = pipe.server.dgram_recv(&mut buf);
assert_eq!(r, Ok(14));
assert_eq!(pipe.server.is_readable(), false);
let r = pipe.client.dgram_recv(&mut buf);
assert_eq!(r, Ok(14));
assert_eq!(pipe.client.is_readable(), false);
}
#[test]
fn close() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.close(false, 0x1234, b"hello?"), Ok(()));
assert_eq!(
pipe.client.close(false, 0x4321, b"hello?"),
Err(Error::Done)
);
let len = pipe.client.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::ConnectionClose {
error_code: 0x1234,
frame_type: 0,
reason: b"hello?".to_vec(),
})
);
}
#[test]
fn app_close() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.close(true, 0x1234, b"hello!"), Ok(()));
assert_eq!(pipe.client.close(true, 0x4321, b"hello!"), Err(Error::Done));
let len = pipe.client.send(&mut buf).unwrap();
let frames =
testing::decode_pkt(&mut pipe.server, &mut buf, len).unwrap();
assert_eq!(
frames.iter().next(),
Some(&frame::Frame::ApplicationClose {
error_code: 0x1234,
reason: b"hello!".to_vec(),
})
);
}
#[test]
fn peer_error() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.server.close(false, 0x1234, b"hello?"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.client.peer_error(),
Some(&ConnectionError {
is_app: false,
error_code: 0x1234u64,
reason: b"hello?".to_vec()
})
);
}
#[test]
fn app_peer_error() {
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.server.close(true, 0x1234, b"hello!"), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(
pipe.client.peer_error(),
Some(&ConnectionError {
is_app: true,
error_code: 0x1234u64,
reason: b"hello!".to_vec()
})
);
}
#[test]
fn update_max_datagram_size() {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let client_scid = ConnectionId::from_ref(&client_scid);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let server_scid = ConnectionId::from_ref(&server_scid);
let mut client_config = Config::new(crate::PROTOCOL_VERSION).unwrap();
client_config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
client_config.set_max_recv_udp_payload_size(1200);
let mut server_config = Config::new(crate::PROTOCOL_VERSION).unwrap();
server_config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
server_config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
server_config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
server_config.verify_peer(false);
server_config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
server_config.set_max_send_udp_payload_size(1500);
let mut pipe = testing::Pipe {
client: connect(Some("quic.tech"), &client_scid, &mut client_config)
.unwrap(),
server: accept(&server_scid, None, &mut server_config).unwrap(),
};
assert_eq!(pipe.server.recovery.max_datagram_size(), 1500);
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.server.recovery.max_datagram_size(), 1200);
assert_eq!(pipe.server.recovery.cwnd(), 12000);
}
#[test]
fn send_capacity() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(100000);
config.set_initial_max_stream_data_bidi_local(10000);
config.set_initial_max_stream_data_bidi_remote(10000);
config.set_initial_max_streams_bidi(10);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));
assert_eq!(pipe.client.stream_send(0, b"hello!", true), Ok(6));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello!", true), Ok(6));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(8, b"hello!", true), Ok(6));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.stream_send(12, b"hello!", true), Ok(6));
assert_eq!(pipe.advance(), Ok(()));
let mut r = pipe.server.readable().collect::<Vec<u64>>();
assert_eq!(r.len(), 4);
r.sort();
assert_eq!(r, [0, 4, 8, 12]);
assert_eq!(pipe.server.stream_recv(0, &mut buf), Ok((6, true)));
assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((6, true)));
assert_eq!(pipe.server.stream_recv(8, &mut buf), Ok((6, true)));
assert_eq!(pipe.server.stream_recv(12, &mut buf), Ok((6, true)));
assert_eq!(pipe.server.tx_cap, 12000);
assert_eq!(pipe.server.stream_send(0, &buf[..5000], false), Ok(5000));
assert_eq!(pipe.server.stream_send(4, &buf[..5000], false), Ok(5000));
assert_eq!(pipe.server.stream_send(8, &buf[..5000], false), Ok(2000));
assert_eq!(pipe.server.stream_send(12, &buf[..5000], false), Ok(0));
assert_eq!(pipe.server.tx_cap, 0);
assert_eq!(pipe.advance(), Ok(()));
}
}
pub use crate::packet::ConnectionId;
pub use crate::packet::Header;
pub use crate::packet::Type;
pub use crate::recovery::CongestionControlAlgorithm;
pub use crate::stream::StreamIter;
mod crypto;
mod dgram;
#[cfg(feature = "ffi")]
mod ffi;
mod frame;
pub mod h3;
mod minmax;
mod octets;
mod packet;
mod rand;
mod ranges;
mod recovery;
mod stream;
mod tls;