#![warn(missing_docs)]
#[macro_use]
extern crate log;
use std::cmp;
use std::time;
pub const PROTOCOL_VERSION: u32 = 0xff00_0014;
pub const MAX_CONN_ID_LEN: usize = crate::packet::MAX_CID_LEN as usize;
pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
const PAYLOAD_MIN_LEN: usize = 4;
const MAX_AMPLIFICATION_FACTOR: usize = 3;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq)]
#[repr(C)]
pub enum Error {
Done = -1,
BufferTooShort = -2,
UnknownVersion = -3,
InvalidFrame = -4,
InvalidPacket = -5,
InvalidState = -6,
InvalidStreamState = -7,
InvalidTransportParam = -8,
CryptoFail = -9,
TlsFail = -10,
FlowControl = -11,
StreamLimit = -12,
FinalSize = -13,
}
impl Error {
fn to_wire(self) -> u16 {
match self {
Error::Done => 0x0,
Error::InvalidFrame => 0x7,
Error::InvalidStreamState => 0x5,
Error::InvalidTransportParam => 0x8,
Error::CryptoFail => 0x100,
Error::TlsFail => 0x100,
Error::FlowControl => 0x3,
Error::StreamLimit => 0x4,
Error::FinalSize => 0x6,
_ => 0xa,
}
}
fn to_c(self) -> libc::ssize_t {
self as _
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for Error {
fn description(&self) -> &str {
match self {
Error::Done => "nothing else to do",
Error::BufferTooShort => "buffer is too short",
Error::UnknownVersion => "version is unknown",
Error::InvalidFrame => "frame is invalid",
Error::InvalidPacket => "packet is invalid",
Error::InvalidState => "connection state is invalid",
Error::InvalidStreamState => "stream state is invalid",
Error::InvalidTransportParam => "transport parameter is invalid",
Error::CryptoFail => "crypto operation failed",
Error::TlsFail => "TLS failed",
Error::FlowControl => "flow control limit was violated",
Error::StreamLimit => "stream limit was violated",
Error::FinalSize => "data exceeded stream's final size",
}
}
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl std::convert::From<octets::BufferTooShortError> for Error {
fn from(_err: octets::BufferTooShortError) -> Self {
Error::BufferTooShort
}
}
#[repr(C)]
pub enum Shutdown {
Read = 0,
Write = 1,
}
pub struct Config {
local_transport_params: TransportParams,
version: u32,
tls_ctx: tls::Context,
application_protos: Vec<Vec<u8>>,
grease: bool,
}
impl Config {
pub fn new(version: u32) -> Result<Config> {
let tls_ctx = tls::Context::new().map_err(|_| Error::TlsFail)?;
Ok(Config {
local_transport_params: TransportParams::default(),
version,
tls_ctx,
application_protos: Vec::new(),
grease: true,
})
}
pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.use_certificate_chain_file(file)
.map_err(|_| Error::TlsFail)
}
pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx
.use_privkey_file(file)
.map_err(|_| Error::TlsFail)
}
pub fn verify_peer(&mut self, verify: bool) {
self.tls_ctx.set_verify(verify);
}
pub fn grease(&mut self, grease: bool) {
self.grease = grease;
}
pub fn log_keys(&mut self) {
self.tls_ctx.enable_keylog();
}
pub fn set_application_protos(&mut self, protos: &[u8]) -> Result<()> {
let mut protos = protos.to_vec();
let mut b = octets::Octets::with_slice(&mut protos);
let mut protos_list = Vec::new();
while let Ok(proto) = b.get_bytes_with_u8_length() {
protos_list.push(proto.to_vec());
}
self.application_protos = protos_list;
self.tls_ctx
.set_alpn(&self.application_protos)
.map_err(|_| Error::TlsFail)
}
pub fn set_idle_timeout(&mut self, v: u64) {
self.local_transport_params.idle_timeout = v;
}
pub fn set_stateless_reset_token(&mut self, v: &[u8; 16]) {
self.local_transport_params.stateless_reset_token = Some(v.to_vec());
}
pub fn set_max_packet_size(&mut self, v: u64) {
self.local_transport_params.max_packet_size = v;
}
pub fn set_initial_max_data(&mut self, v: u64) {
self.local_transport_params.initial_max_data = v;
}
pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_local = v;
}
pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_remote = v;
}
pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_stream_data_uni = v;
}
pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_bidi = v;
}
pub fn set_initial_max_streams_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_uni = v;
}
pub fn set_ack_delay_exponent(&mut self, v: u64) {
self.local_transport_params.ack_delay_exponent = v;
}
pub fn set_max_ack_delay(&mut self, v: u64) {
self.local_transport_params.max_ack_delay = v;
}
pub fn set_disable_migration(&mut self, v: bool) {
self.local_transport_params.disable_migration = v;
}
}
pub struct Connection {
version: u32,
dcid: Vec<u8>,
scid: Vec<u8>,
trace_id: String,
pkt_num_spaces: [packet::PktNumSpace; packet::EPOCH_COUNT],
peer_transport_params: TransportParams,
local_transport_params: TransportParams,
handshake: tls::Handshake,
recovery: recovery::Recovery,
application_protos: Vec<Vec<u8>>,
recv_count: usize,
sent_count: usize,
rx_data: usize,
max_rx_data: usize,
max_rx_data_next: usize,
tx_data: usize,
max_tx_data: usize,
max_send_bytes: usize,
streams: stream::StreamMap,
odcid: Option<Vec<u8>>,
token: Option<Vec<u8>>,
early_app_frames: Vec<frame::Frame>,
early_app_pkts: usize,
error: Option<u16>,
app_error: Option<u16>,
app_reason: Vec<u8>,
challenge: Option<Vec<u8>>,
idle_timer: Option<time::Instant>,
draining_timer: Option<time::Instant>,
is_server: bool,
derived_initial_secrets: bool,
did_version_negotiation: bool,
did_retry: bool,
got_peer_conn_id: bool,
verified_peer_address: bool,
handshake_completed: bool,
closed: bool,
grease: bool,
}
pub fn accept(
scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
) -> Result<Box<Connection>> {
let conn = Connection::new(scid, odcid, config, true)?;
Ok(conn)
}
pub fn connect(
server_name: Option<&str>, scid: &[u8], config: &mut Config,
) -> Result<Box<Connection>> {
let conn = Connection::new(scid, None, config, false)?;
if server_name.is_some() {
conn.handshake
.set_host_name(server_name.unwrap())
.map_err(|_| Error::TlsFail)?;
}
Ok(conn)
}
pub fn negotiate_version(
scid: &[u8], dcid: &[u8], out: &mut [u8],
) -> Result<usize> {
packet::negotiate_version(scid, dcid, out)
}
pub fn retry(
scid: &[u8], dcid: &[u8], new_scid: &[u8], token: &[u8], out: &mut [u8],
) -> Result<usize> {
packet::retry(scid, dcid, new_scid, token, out)
}
impl Connection {
fn new(
scid: &[u8], odcid: Option<&[u8]>, config: &mut Config, is_server: bool,
) -> Result<Box<Connection>> {
let tls = config.tls_ctx.new_handshake().map_err(|_| Error::TlsFail)?;
Connection::with_tls(scid, odcid, config, tls, is_server)
}
#[doc(hidden)]
pub fn with_tls(
scid: &[u8], odcid: Option<&[u8]>, config: &mut Config,
tls: tls::Handshake, is_server: bool,
) -> Result<Box<Connection>> {
let max_rx_data = config.local_transport_params.initial_max_data;
let scid_as_hex: Vec<String> =
scid.iter().map(|b| format!("{:02x}", b)).collect();
let mut conn = Box::new(Connection {
version: config.version,
dcid: Vec::new(),
scid: scid.to_vec(),
trace_id: scid_as_hex.join(""),
pkt_num_spaces: [
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
],
peer_transport_params: TransportParams::default(),
local_transport_params: config.local_transport_params.clone(),
handshake: tls,
recovery: recovery::Recovery::default(),
application_protos: config.application_protos.clone(),
recv_count: 0,
sent_count: 0,
rx_data: 0,
max_rx_data: max_rx_data as usize,
max_rx_data_next: max_rx_data as usize,
tx_data: 0,
max_tx_data: 0,
max_send_bytes: 0,
streams: stream::StreamMap::default(),
odcid: None,
token: None,
early_app_frames: Vec::new(),
early_app_pkts: 0,
error: None,
app_error: None,
app_reason: Vec::new(),
challenge: None,
idle_timer: None,
draining_timer: None,
is_server,
derived_initial_secrets: false,
did_version_negotiation: false,
did_retry: false,
got_peer_conn_id: false,
verified_peer_address: odcid.is_some(),
handshake_completed: false,
closed: false,
grease: config.grease,
});
if let Some(odcid) = odcid {
conn.local_transport_params.original_connection_id =
Some(odcid.to_vec());
}
conn.handshake.init(&conn).map_err(|_| Error::TlsFail)?;
conn.streams.update_local_max_streams_bidi(
config.local_transport_params.initial_max_streams_bidi as usize,
);
conn.streams.update_local_max_streams_uni(
config.local_transport_params.initial_max_streams_uni as usize,
);
if !is_server {
let mut dcid = [0; 16];
rand::rand_bytes(&mut dcid[..]);
let (aead_open, aead_seal) =
crypto::derive_initial_key_material(&dcid, conn.is_server)?;
conn.dcid.extend_from_slice(&dcid);
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
conn.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
conn.derived_initial_secrets = true;
}
Ok(conn)
}
pub fn recv(&mut self, buf: &mut [u8]) -> Result<usize> {
let len = buf.len();
let mut done = 0;
let mut left = len;
while left > 0 {
let read = match self.recv_single(&mut buf[len - left..len]) {
Ok(v) => v,
Err(Error::Done) => return Err(Error::Done),
Err(e) => {
self.close(false, e.to_wire(), b"").ok();
return Err(e);
},
};
done += read;
left -= read;
}
Ok(done)
}
fn recv_single(&mut self, buf: &mut [u8]) -> Result<usize> {
let now = time::Instant::now();
if buf.is_empty() {
return Err(Error::BufferTooShort);
}
if self.draining_timer.is_some() {
return Err(Error::Done);
}
let is_closing = self.error.is_some() || self.app_error.is_some();
if is_closing {
return Err(Error::Done);
}
let mut b = octets::Octets::with_slice(buf);
let mut hdr = Header::from_bytes(&mut b, self.scid.len())?;
if hdr.ty == packet::Type::VersionNegotiation {
if self.is_server {
return Err(Error::Done);
}
if self.did_version_negotiation {
return Err(Error::Done);
}
if hdr.dcid != self.scid {
return Err(Error::Done);
}
if hdr.scid != self.dcid {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
let versions = match hdr.versions {
Some(ref v) => v,
None => return Err(Error::InvalidPacket),
};
let mut new_version = 0;
for v in versions.iter() {
if *v == PROTOCOL_VERSION {
new_version = *v;
}
}
if new_version == 0 {
return Err(Error::UnknownVersion);
}
self.version = new_version;
self.did_version_negotiation = true;
self.got_peer_conn_id = false;
self.recovery.drop_unacked_data(packet::EPOCH_INITIAL);
self.pkt_num_spaces[packet::EPOCH_INITIAL].clear();
self.handshake.clear().map_err(|_| Error::TlsFail)?;
return Err(Error::Done);
}
if hdr.ty == packet::Type::Retry {
if self.is_server {
return Err(Error::Done);
}
if self.did_retry {
return Err(Error::Done);
}
if hdr.odcid.as_ref() != Some(&self.dcid) {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
self.token = hdr.token;
self.did_retry = true;
self.odcid = Some(self.dcid.clone());
self.dcid.resize(hdr.scid.len(), 0);
self.dcid.copy_from_slice(&hdr.scid);
let (aead_open, aead_seal) =
crypto::derive_initial_key_material(&hdr.scid, self.is_server)?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.got_peer_conn_id = false;
self.recovery.drop_unacked_data(packet::EPOCH_INITIAL);
self.pkt_num_spaces[packet::EPOCH_INITIAL].clear();
self.handshake.clear().map_err(|_| Error::TlsFail)?;
return Err(Error::Done);
}
if hdr.ty != packet::Type::Application && hdr.version != self.version {
return Err(Error::UnknownVersion);
}
let payload_len = if hdr.ty == packet::Type::Application {
b.cap()
} else {
b.get_varint()? as usize
};
if b.cap() < payload_len {
return Err(Error::BufferTooShort);
}
let header_len = b.off();
if !self.is_server && !self.got_peer_conn_id {
self.dcid.resize(hdr.scid.len(), 0);
self.dcid.copy_from_slice(&hdr.scid);
self.got_peer_conn_id = true;
}
if !self.derived_initial_secrets {
let (aead_open, aead_seal) =
crypto::derive_initial_key_material(&hdr.dcid, self.is_server)?;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open =
Some(aead_open);
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal =
Some(aead_seal);
self.derived_initial_secrets = true;
self.dcid.extend_from_slice(&hdr.scid);
self.got_peer_conn_id = true;
}
let epoch = hdr.ty.to_epoch()?;
let aead = match self.pkt_num_spaces[epoch].crypto_open {
Some(ref v) => v,
None => {
trace!(
"{} dropped undecryptable packet type={:?} len={}",
self.trace_id,
hdr.ty,
payload_len
);
return Ok(header_len + payload_len);
},
};
let aead_tag_len = aead.alg().tag_len();
packet::decrypt_hdr(&mut b, &mut hdr, &aead)?;
let pn = packet::decode_pkt_num(
self.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
trace!(
"{} rx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
let mut payload = match packet::decrypt_pkt(
&mut b,
pn,
hdr.pkt_num_len,
payload_len,
&aead,
) {
Ok(v) => v,
Err(Error::CryptoFail) => {
trace!(
"{} dropped undecryptable packet type={:?} len={}",
self.trace_id,
hdr.ty,
payload_len,
);
return Ok(header_len + payload_len);
},
Err(e) => return Err(e),
};
if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
trace!("{} ignored duplicate packet {}", self.trace_id, pn);
return Err(Error::Done);
}
if hdr.ty == packet::Type::Application && !self.is_established() {
self.early_app_pkts += 1;
if self.early_app_pkts > recovery::INITIAL_WINDOW_PACKETS {
error!(
"{} dropped early application packet len={} pn={}",
self.trace_id, payload_len, pn,
);
return Ok(header_len + payload_len);
}
}
if !self.verified_peer_address {
self.max_send_bytes +=
(header_len + payload_len) * MAX_AMPLIFICATION_FACTOR;
}
let mut ack_elicited = false;
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
if frame.ack_eliciting() {
ack_elicited = true;
}
if hdr.ty == packet::Type::Application && !self.is_established() {
self.early_app_frames.push(frame);
continue;
}
self.process_frame(frame, epoch, now)?;
}
for acked in self.recovery.acked[epoch].drain(..) {
match acked {
frame::Frame::ACK { ranges, .. } => {
let largest_acked = ranges.largest().unwrap();
self.pkt_num_spaces[epoch]
.recv_pkt_need_ack
.remove_until(largest_acked);
},
frame::Frame::Crypto { data } => {
self.pkt_num_spaces[epoch]
.crypto_stream
.send
.ack(data.off(), data.len());
},
frame::Frame::Stream { stream_id, data } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
stream.send.ack(data.off(), data.len());
},
_ => (),
}
}
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.largest() < Some(pn) {
self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
}
self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
self.pkt_num_spaces[epoch].ack_elicited =
cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
self.pkt_num_spaces[epoch].largest_rx_pkt_num =
cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
if self.local_transport_params.idle_timeout > 0 {
self.idle_timer = Some(
now + time::Duration::from_millis(
self.local_transport_params.idle_timeout,
),
);
}
self.recv_count += 1;
let read = b.off() + aead_tag_len;
if self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_initial_state();
self.verified_peer_address = true;
}
Ok(read)
}
pub fn send(&mut self, out: &mut [u8]) -> Result<usize> {
let now = time::Instant::now();
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.draining_timer.is_some() {
return Err(Error::Done);
}
let is_closing = self.error.is_some() || self.app_error.is_some();
if !is_closing {
self.do_handshake(now)?;
}
let max_pkt_len = if self.handshake_completed {
cmp::min(16383, self.peer_transport_params.max_packet_size) as usize
} else {
1200
};
let avail = cmp::min(max_pkt_len, out.len());
let mut b = octets::Octets::with_slice(&mut out[..avail]);
let epoch = self.write_epoch()?;
let pkt_type = packet::Type::from_epoch(epoch);
for lost in self.recovery.lost[epoch].drain(..) {
match lost {
frame::Frame::Crypto { data } => {
self.pkt_num_spaces[epoch].crypto_stream.send.push(data)?;
},
frame::Frame::Stream { stream_id, data } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
self.tx_data -= data.len();
stream.send.push(data)?;
},
frame::Frame::ACK { .. } => {
self.pkt_num_spaces[epoch].ack_elicited = true;
},
_ => (),
}
}
let mut left = cmp::min(self.recovery.cwnd(), b.cap());
if !self.verified_peer_address && self.is_server {
left = cmp::min(left, self.max_send_bytes);
}
let pn = self.pkt_num_spaces[epoch].next_pkt_num;
let pn_len = packet::pkt_num_len(pn)?;
let overhead = self.pkt_num_spaces[epoch].overhead();
let hdr = Header {
ty: pkt_type,
version: self.version,
dcid: self.dcid.clone(),
scid: self.scid.clone(),
pkt_num: 0,
pkt_num_len: pn_len,
odcid: None,
token: self.token.clone(),
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
left = left
.checked_sub(b.off() + 2 + pn_len + overhead)
.ok_or(Error::Done)?;
let mut frames: Vec<frame::Frame> = Vec::new();
let mut ack_eliciting = false;
let mut in_flight = false;
let mut is_crypto = false;
let mut payload_len = 0;
if self.pkt_num_spaces[epoch].ack_elicited {
let ack_delay =
self.pkt_num_spaces[epoch].largest_rx_pkt_time.elapsed();
let ack_delay = ack_delay.as_micros() as u64 /
2_u64
.pow(self.local_transport_params.ack_delay_exponent as u32);
let frame = frame::Frame::ACK {
ack_delay,
ranges: self.pkt_num_spaces[epoch].recv_pkt_need_ack.clone(),
};
if frame.wire_len() <= left {
self.pkt_num_spaces[epoch].ack_elicited = false;
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
}
}
if pkt_type == packet::Type::Application &&
!is_closing &&
(self.max_rx_data_next != self.max_rx_data &&
self.max_rx_data_next / 2 > self.max_rx_data - self.rx_data)
{
let frame = frame::Frame::MaxData {
max: self.max_rx_data_next as u64,
};
if frame.wire_len() <= left {
self.max_rx_data = self.max_rx_data_next;
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
ack_eliciting = true;
in_flight = true;
}
}
if pkt_type == packet::Type::Application && !is_closing {
for (id, stream) in self
.streams
.iter_mut()
.filter(|(_, s)| s.recv.more_credit())
{
let frame = frame::Frame::MaxStreamData {
stream_id: *id,
max: stream.recv.update_max_data() as u64,
};
if frame.wire_len() > left {
break;
}
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
ack_eliciting = true;
in_flight = true;
}
}
if self.recovery.probes > 0 && left >= 1 {
let frame = frame::Frame::Ping;
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
self.recovery.probes -= 1;
ack_eliciting = true;
in_flight = true;
}
if let Some(err) = self.error {
let frame = frame::Frame::ConnectionClose {
error_code: err,
frame_type: 0,
reason: Vec::new(),
};
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
if let Some(err) = self.app_error {
if pkt_type == packet::Type::Application {
let frame = frame::Frame::ApplicationClose {
error_code: err,
reason: self.app_reason.clone(),
};
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
self.draining_timer = Some(now + (self.recovery.pto() * 3));
ack_eliciting = true;
in_flight = true;
}
}
if let Some(ref challenge) = self.challenge {
let frame = frame::Frame::PathResponse {
data: challenge.clone(),
};
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
self.challenge = None;
ack_eliciting = true;
in_flight = true;
}
if self.pkt_num_spaces[epoch].crypto_stream.writable() && !is_closing {
let crypto_len = left - frame::MAX_CRYPTO_OVERHEAD;
let crypto_buf = self.pkt_num_spaces[epoch]
.crypto_stream
.send
.pop(crypto_len)?;
let frame = frame::Frame::Crypto { data: crypto_buf };
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
ack_eliciting = true;
in_flight = true;
is_crypto = true;
}
if pkt_type == packet::Type::Application &&
!is_closing &&
self.max_tx_data > self.tx_data &&
left > frame::MAX_STREAM_OVERHEAD
{
for (id, stream) in
self.streams.iter_mut().filter(|(_, s)| s.writable())
{
let stream_len = cmp::min(
left - frame::MAX_STREAM_OVERHEAD,
self.max_tx_data - self.tx_data,
);
let stream_buf = stream.send.pop(stream_len)?;
if stream_buf.is_empty() {
continue;
}
self.tx_data += stream_buf.len();
let frame = frame::Frame::Stream {
stream_id: *id,
data: stream_buf,
};
payload_len += frame.wire_len();
left -= frame.wire_len();
frames.push(frame);
ack_eliciting = true;
in_flight = true;
break;
}
}
if frames.is_empty() {
return Err(Error::Done);
}
if !self.is_server && pkt_type == packet::Type::Initial {
let pkt_len = pn_len + payload_len + overhead;
let frame = frame::Frame::Padding {
len: cmp::min(MIN_CLIENT_INITIAL_LEN - pkt_len, left),
};
payload_len += frame.wire_len();
frames.push(frame);
in_flight = true;
}
if payload_len < PAYLOAD_MIN_LEN {
let frame = frame::Frame::Padding {
len: PAYLOAD_MIN_LEN - payload_len,
};
payload_len += frame.wire_len();
frames.push(frame);
in_flight = true;
}
payload_len += overhead;
if pkt_type != packet::Type::Application {
let len = pn_len + payload_len;
b.put_varint(len as u64)?;
}
packet::encode_pkt_num(pn, &mut b)?;
let payload_offset = b.off();
trace!(
"{} tx pkt {:?} len={} pn={}",
self.trace_id,
hdr,
payload_len,
pn
);
for frame in &frames {
trace!("{} tx frm {:?}", self.trace_id, frame);
frame.to_bytes(&mut b)?;
}
let aead = match self.pkt_num_spaces[epoch].crypto_seal {
Some(ref v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)?;
let sent_pkt = recovery::Sent {
pkt_num: pn,
frames,
time: now,
size: if ack_eliciting { written } else { 0 },
ack_eliciting,
in_flight,
is_crypto,
};
self.recovery
.on_packet_sent(sent_pkt, epoch, now, &self.trace_id);
self.pkt_num_spaces[epoch].next_pkt_num += 1;
self.sent_count += 1;
if !self.is_server && hdr.ty == packet::Type::Handshake {
self.drop_initial_state();
}
self.max_send_bytes = self.max_send_bytes.saturating_sub(written);
Ok(written)
}
pub fn stream_recv(
&mut self, stream_id: u64, out: &mut [u8],
) -> Result<(usize, bool)> {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => return Err(Error::InvalidStreamState),
};
if !stream.readable() {
return Err(Error::Done);
}
let (read, fin) = stream.recv.pop(out)?;
self.max_rx_data_next = self.max_rx_data_next.saturating_add(read);
Ok((read, fin))
}
pub fn stream_send(
&mut self, stream_id: u64, buf: &[u8], fin: bool,
) -> Result<usize> {
if !stream::is_bidi(stream_id) &&
!stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let max_rx_data =
self.local_transport_params
.initial_max_stream_data_bidi_local as usize;
let max_tx_data =
self.peer_transport_params
.initial_max_stream_data_bidi_remote as usize;
let stream = self.streams.get_or_create(
stream_id,
max_rx_data,
max_tx_data,
true,
self.is_server,
)?;
stream.send.push_slice(buf, fin)?;
Ok(buf.len())
}
pub fn stream_shutdown(
&mut self, stream_id: u64, direction: Shutdown, _err: u64,
) -> Result<()> {
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
match direction {
Shutdown::Read => stream.recv.shutdown(),
Shutdown::Write => stream.send.shutdown(),
}
Ok(())
}
pub fn stream_finished(&self, stream_id: u64) -> bool {
let stream = match self.streams.get(stream_id) {
Some(v) => v,
None => return true,
};
stream.recv.is_fin()
}
pub fn readable(&mut self) -> Readable {
self.streams.readable()
}
pub fn timeout(&self) -> Option<std::time::Duration> {
if self.closed {
return None;
}
let timeout = if self.draining_timer.is_some() {
self.draining_timer
} else if self.recovery.loss_detection_timer().is_some() {
self.recovery.loss_detection_timer()
} else if self.idle_timer.is_some() {
self.idle_timer
} else {
None
};
if let Some(timeout) = timeout {
let now = time::Instant::now();
if timeout <= now {
return Some(std::time::Duration::new(0, 0));
}
return Some(timeout.duration_since(now));
}
None
}
pub fn on_timeout(&mut self) {
let now = time::Instant::now();
if let Some(draining_timer) = self.draining_timer {
if draining_timer <= now {
trace!("{} draining timeout expired", self.trace_id);
self.closed = true;
}
return;
}
if self.idle_timer.is_some() && self.idle_timer.unwrap() <= now {
trace!("{} idle timeout expired", self.trace_id);
self.closed = true;
return;
}
if self.recovery.loss_detection_timer().is_some() &&
self.recovery.loss_detection_timer().unwrap() <= now
{
trace!("{} loss detection timeout expired", self.trace_id);
self.recovery.on_loss_detection_timeout(now, &self.trace_id);
return;
}
}
pub fn close(&mut self, app: bool, err: u16, reason: &[u8]) -> Result<()> {
if self.draining_timer.is_some() {
return Err(Error::Done);
}
if self.error.is_some() || self.app_error.is_some() {
return Err(Error::Done);
}
if app {
self.app_error = Some(err);
self.app_reason.extend_from_slice(reason);
} else {
self.error = Some(err);
}
Ok(())
}
pub fn trace_id(&self) -> &str {
&self.trace_id
}
pub fn application_proto(&self) -> &[u8] {
self.handshake.get_alpn_protocol()
}
pub fn is_established(&self) -> bool {
self.handshake_completed
}
pub fn is_resumed(&self) -> bool {
self.handshake.is_resumed()
}
pub fn is_closed(&self) -> bool {
self.closed
}
pub fn stats(&self) -> Stats {
Stats {
recv: self.recv_count,
sent: self.sent_count,
lost: self.recovery.lost_count,
cwnd: self.recovery.cwnd(),
rtt: self.recovery.rtt(),
}
}
fn do_handshake(&mut self, now: time::Instant) -> Result<()> {
if !self.handshake_completed {
match self.handshake.do_handshake() {
Ok(_) => {
if self.application_proto().is_empty() {
self.error = Some(0x178);
return Err(Error::TlsFail);
}
self.handshake_completed = true;
let mut raw_params =
self.handshake.get_quic_transport_params().to_vec();
let peer_params =
TransportParams::decode(&mut raw_params, self.is_server)?;
if peer_params.original_connection_id != self.odcid {
return Err(Error::InvalidTransportParam);
}
self.max_tx_data = peer_params.initial_max_data as usize;
self.streams.update_peer_max_streams_bidi(
peer_params.initial_max_streams_bidi as usize,
);
self.streams.update_peer_max_streams_uni(
peer_params.initial_max_streams_uni as usize,
);
self.recovery.max_ack_delay =
time::Duration::from_millis(peer_params.max_ack_delay);
self.peer_transport_params = peer_params;
trace!("{} connection established: cipher={:?} proto={:?} resumed={} {:?}",
&self.trace_id,
self.handshake.cipher(),
std::str::from_utf8(self.application_proto()),
self.is_resumed(),
self.peer_transport_params);
for f in self
.early_app_frames
.drain(..)
.collect::<Vec<frame::Frame>>()
{
self.process_frame(f, packet::EPOCH_APPLICATION, now)?;
}
},
Err(tls::Error::TlsFail) => return Err(Error::TlsFail),
Err(tls::Error::SyscallFail) => return Err(Error::TlsFail),
Err(_) => (),
}
}
Ok(())
}
fn write_epoch(&self) -> Result<packet::Epoch> {
if self.error.is_some() || self.recovery.probes > 0 {
let epoch = match self.handshake.get_write_level() {
crypto::Level::Initial => packet::EPOCH_INITIAL,
crypto::Level::ZeroRTT => unreachable!(),
crypto::Level::Handshake => packet::EPOCH_HANDSHAKE,
crypto::Level::Application => packet::EPOCH_APPLICATION,
};
return Ok(epoch);
}
for epoch in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
if epoch == packet::EPOCH_APPLICATION && !self.handshake_completed {
continue;
}
if self.pkt_num_spaces[epoch].ready() {
return Ok(epoch);
}
if !self.recovery.lost[epoch].is_empty() {
return Ok(epoch);
}
}
if self.handshake_completed &&
(self.streams.has_writable() || self.streams.has_out_of_credit())
{
return Ok(packet::EPOCH_APPLICATION);
}
Err(Error::Done)
}
fn process_frame(
&mut self, frame: frame::Frame, epoch: packet::Epoch,
now: std::time::Instant,
) -> Result<()> {
trace!("{} rx frm {:?}", self.trace_id, frame);
match frame {
frame::Frame::Padding { .. } => (),
frame::Frame::Ping => (),
frame::Frame::ACK { ranges, ack_delay } => {
let ack_delay = ack_delay *
2_u64.pow(
self.peer_transport_params.ack_delay_exponent as u32,
);
self.recovery.on_ack_received(
&ranges,
ack_delay,
epoch,
now,
&self.trace_id,
);
},
frame::Frame::ResetStream {
stream_id,
final_size,
..
} => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let max_rx_data = self
.local_transport_params
.initial_max_stream_data_bidi_remote
as usize;
let max_tx_data = self
.peer_transport_params
.initial_max_stream_data_bidi_local
as usize;
let stream = self.streams.get_or_create(
stream_id,
max_rx_data,
max_tx_data,
false,
self.is_server,
)?;
self.rx_data += stream.recv.reset(final_size as usize)?;
if self.rx_data > self.max_rx_data {
return Err(Error::FlowControl);
}
},
frame::Frame::StopSending { stream_id, .. } => {
if !stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
return Err(Error::InvalidStreamState);
}
},
frame::Frame::Crypto { data } => {
self.pkt_num_spaces[epoch].crypto_stream.recv.push(data)?;
let mut crypto_buf = [0; 512];
let level = crypto::Level::from_epoch(epoch);
let stream = &mut self.pkt_num_spaces[epoch].crypto_stream;
while let Ok((read, _)) = stream.recv.pop(&mut crypto_buf) {
let recv_buf = &crypto_buf[..read];
self.handshake
.provide_data(level, &recv_buf)
.map_err(|_| Error::TlsFail)?;
}
self.do_handshake(now)?;
},
frame::Frame::NewToken { .. } => (),
frame::Frame::Stream { stream_id, data } => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState);
}
let max_rx_data = self
.local_transport_params
.initial_max_stream_data_bidi_remote
as usize;
let max_tx_data = self
.peer_transport_params
.initial_max_stream_data_bidi_local
as usize;
let stream = self.streams.get_or_create(
stream_id,
max_rx_data,
max_tx_data,
false,
self.is_server,
)?;
self.rx_data += data.len();
if self.rx_data > self.max_rx_data {
return Err(Error::FlowControl);
}
stream.recv.push(data)?;
},
frame::Frame::MaxData { max } => {
self.max_tx_data = cmp::max(self.max_tx_data, max as usize);
},
frame::Frame::MaxStreamData { stream_id, max } => {
let max_rx_data = self
.local_transport_params
.initial_max_stream_data_bidi_remote
as usize;
let max_tx_data = self
.peer_transport_params
.initial_max_stream_data_bidi_local
as usize;
let stream = self.streams.get_or_create(
stream_id,
max_rx_data,
max_tx_data,
false,
self.is_server,
)?;
stream.send.update_max_data(max as usize);
},
frame::Frame::MaxStreamsBidi { max } => {
if max > 2u64.pow(60) {
return Err(Error::StreamLimit);
}
self.streams.update_peer_max_streams_bidi(max as usize);
},
frame::Frame::MaxStreamsUni { max } => {
if max > 2u64.pow(60) {
return Err(Error::StreamLimit);
}
self.streams.update_peer_max_streams_uni(max as usize);
},
frame::Frame::DataBlocked { .. } => (),
frame::Frame::StreamDataBlocked { .. } => (),
frame::Frame::StreamsBlockedBidi { .. } => (),
frame::Frame::StreamsBlockedUni { .. } => (),
frame::Frame::NewConnectionId { .. } => (),
frame::Frame::RetireConnectionId { .. } => (),
frame::Frame::PathChallenge { data } => {
self.challenge = Some(data);
},
frame::Frame::PathResponse { .. } => (),
frame::Frame::ConnectionClose { .. } => {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
},
frame::Frame::ApplicationClose { .. } => {
self.draining_timer = Some(now + (self.recovery.pto() * 3));
},
}
Ok(())
}
fn drop_initial_state(&mut self) {
if self.pkt_num_spaces[packet::EPOCH_INITIAL]
.crypto_open
.is_none()
{
return;
}
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_open = None;
self.pkt_num_spaces[packet::EPOCH_INITIAL].crypto_seal = None;
self.pkt_num_spaces[packet::EPOCH_INITIAL].clear();
self.recovery.drop_unacked_data(packet::EPOCH_INITIAL);
trace!("{} dropped initial state", self.trace_id);
}
}
#[derive(Clone)]
pub struct Stats {
pub recv: usize,
pub sent: usize,
pub lost: usize,
pub rtt: time::Duration,
pub cwnd: usize,
}
impl std::fmt::Debug for Stats {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"recv={} sent={} lost={} rtt={:?}",
self.recv, self.sent, self.lost, self.rtt
)
}
}
#[derive(Clone, PartialEq)]
struct TransportParams {
pub original_connection_id: Option<Vec<u8>>,
pub idle_timeout: u64,
pub stateless_reset_token: Option<Vec<u8>>,
pub max_packet_size: u64,
pub initial_max_data: u64,
pub initial_max_stream_data_bidi_local: u64,
pub initial_max_stream_data_bidi_remote: u64,
pub initial_max_stream_data_uni: u64,
pub initial_max_streams_bidi: u64,
pub initial_max_streams_uni: u64,
pub ack_delay_exponent: u64,
pub max_ack_delay: u64,
pub disable_migration: bool,
}
impl Default for TransportParams {
fn default() -> TransportParams {
TransportParams {
original_connection_id: None,
idle_timeout: 0,
stateless_reset_token: None,
max_packet_size: 65527,
initial_max_data: 0,
initial_max_stream_data_bidi_local: 0,
initial_max_stream_data_bidi_remote: 0,
initial_max_stream_data_uni: 0,
initial_max_streams_bidi: 0,
initial_max_streams_uni: 0,
ack_delay_exponent: 3,
max_ack_delay: 25,
disable_migration: false,
}
}
}
impl TransportParams {
fn decode(buf: &mut [u8], is_server: bool) -> Result<TransportParams> {
let mut b = octets::Octets::with_slice(buf);
let mut tp = TransportParams::default();
let mut params = b.get_bytes_with_u16_length()?;
while params.cap() > 0 {
let id = params.get_u16()?;
let mut val = params.get_bytes_with_u16_length()?;
match id {
0x0000 => {
if is_server {
return Err(Error::InvalidTransportParam);
}
tp.original_connection_id = Some(val.to_vec());
},
0x0001 => {
tp.idle_timeout = val.get_varint()?;
},
0x0002 => {
if is_server {
return Err(Error::InvalidTransportParam);
}
tp.stateless_reset_token = Some(val.get_bytes(16)?.to_vec());
},
0x0003 => {
tp.max_packet_size = val.get_varint()?;
},
0x0004 => {
tp.initial_max_data = val.get_varint()?;
},
0x0005 => {
tp.initial_max_stream_data_bidi_local = val.get_varint()?;
},
0x0006 => {
tp.initial_max_stream_data_bidi_remote = val.get_varint()?;
},
0x0007 => {
tp.initial_max_stream_data_uni = val.get_varint()?;
},
0x0008 => {
let max = val.get_varint()?;
if max > 2u64.pow(60) {
return Err(Error::StreamLimit);
}
tp.initial_max_streams_bidi = max;
},
0x0009 => {
let max = val.get_varint()?;
if max > 2u64.pow(60) {
return Err(Error::StreamLimit);
}
tp.initial_max_streams_uni = max;
},
0x000a => {
let ack_delay_exponent = val.get_varint()?;
if ack_delay_exponent > 20 {
return Err(Error::InvalidTransportParam);
}
tp.ack_delay_exponent = ack_delay_exponent;
},
0x000b => {
let max_ack_delay = val.get_varint()?;
if max_ack_delay >= 2_u64.pow(14) {
return Err(Error::InvalidTransportParam);
}
tp.max_ack_delay = max_ack_delay;
},
0x000c => {
tp.disable_migration = true;
},
0x000d => {
if is_server {
return Err(Error::InvalidTransportParam);
}
},
_ => (),
}
}
Ok(tp)
}
fn encode<'a>(
tp: &TransportParams, is_server: bool, out: &'a mut [u8],
) -> Result<&'a mut [u8]> {
let mut params = [0; 128];
let params_len = {
let mut b = octets::Octets::with_slice(&mut params);
if is_server {
if let Some(ref odcid) = tp.original_connection_id {
b.put_u16(0x0000)?;
b.put_u16(odcid.len() as u16)?;
b.put_bytes(&odcid)?;
}
};
if tp.idle_timeout != 0 {
b.put_u16(0x0001)?;
b.put_u16(octets::varint_len(tp.idle_timeout) as u16)?;
b.put_varint(tp.idle_timeout)?;
}
if let Some(ref token) = tp.stateless_reset_token {
if is_server {
b.put_u16(0x0002)?;
b.put_u16(token.len() as u16)?;
b.put_bytes(&token)?;
}
}
if tp.max_packet_size != 0 {
b.put_u16(0x0003)?;
b.put_u16(octets::varint_len(tp.max_packet_size) as u16)?;
b.put_varint(tp.max_packet_size)?;
}
if tp.initial_max_data != 0 {
b.put_u16(0x0004)?;
b.put_u16(octets::varint_len(tp.initial_max_data) as u16)?;
b.put_varint(tp.initial_max_data)?;
}
if tp.initial_max_stream_data_bidi_local != 0 {
b.put_u16(0x0005)?;
b.put_u16(octets::varint_len(
tp.initial_max_stream_data_bidi_local,
) as u16)?;
b.put_varint(tp.initial_max_stream_data_bidi_local)?;
}
if tp.initial_max_stream_data_bidi_remote != 0 {
b.put_u16(0x0006)?;
b.put_u16(octets::varint_len(
tp.initial_max_stream_data_bidi_remote,
) as u16)?;
b.put_varint(tp.initial_max_stream_data_bidi_remote)?;
}
if tp.initial_max_stream_data_uni != 0 {
b.put_u16(0x0007)?;
b.put_u16(
octets::varint_len(tp.initial_max_stream_data_uni) as u16
)?;
b.put_varint(tp.initial_max_stream_data_uni)?;
}
if tp.initial_max_streams_bidi != 0 {
b.put_u16(0x0008)?;
b.put_u16(octets::varint_len(tp.initial_max_streams_bidi) as u16)?;
b.put_varint(tp.initial_max_streams_bidi)?;
}
if tp.initial_max_streams_uni != 0 {
b.put_u16(0x0009)?;
b.put_u16(octets::varint_len(tp.initial_max_streams_uni) as u16)?;
b.put_varint(tp.initial_max_streams_uni)?;
}
if tp.ack_delay_exponent != 0 {
b.put_u16(0x000a)?;
b.put_u16(octets::varint_len(tp.ack_delay_exponent) as u16)?;
b.put_varint(tp.ack_delay_exponent)?;
}
if tp.max_ack_delay != 0 {
b.put_u16(0x000b)?;
b.put_u16(octets::varint_len(tp.max_ack_delay) as u16)?;
b.put_varint(tp.max_ack_delay)?;
}
if tp.disable_migration {
b.put_u16(0x000c)?;
b.put_u16(0)?;
}
b.off()
};
let out_len = {
let mut b = octets::Octets::with_slice(out);
b.put_u16(params_len as u16)?;
b.put_bytes(¶ms[..params_len])?;
b.off()
};
Ok(&mut out[..out_len])
}
}
impl std::fmt::Debug for TransportParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "idle_timeout={} ", self.idle_timeout)?;
write!(f, "max_packet_size={} ", self.max_packet_size)?;
write!(f, "initial_max_data={} ", self.initial_max_data)?;
write!(
f,
"initial_max_stream_data_bidi_local={} ",
self.initial_max_stream_data_bidi_local
)?;
write!(
f,
"initial_max_stream_data_bidi_remote={} ",
self.initial_max_stream_data_bidi_remote
)?;
write!(
f,
"initial_max_stream_data_uni={} ",
self.initial_max_stream_data_uni
)?;
write!(
f,
"initial_max_streams_bidi={} ",
self.initial_max_streams_bidi
)?;
write!(
f,
"initial_max_streams_uni={} ",
self.initial_max_streams_uni
)?;
write!(f, "ack_delay_exponent={} ", self.ack_delay_exponent)?;
write!(f, "max_ack_delay={} ", self.max_ack_delay)?;
write!(f, "disable_migration={}", self.disable_migration)?;
Ok(())
}
}
#[doc(hidden)]
pub mod testing {
use super::*;
pub struct Pipe {
pub client: Box<Connection>,
pub server: Box<Connection>,
}
impl Pipe {
pub fn default() -> Result<Pipe> {
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file("examples/cert.crt")?;
config.load_priv_key_from_pem_file("examples/cert.key")?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.verify_peer(false);
Pipe::with_config(&mut config)
}
pub fn with_config(config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, config)?,
server: accept(&server_scid, None, config)?,
})
}
pub fn with_client_config(client_config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.load_cert_chain_from_pem_file("examples/cert.crt")?;
config.load_priv_key_from_pem_file("examples/cert.key")?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, client_config)?,
server: accept(&server_scid, None, &mut config)?,
})
}
pub fn with_server_config(server_config: &mut Config) -> Result<Pipe> {
let mut client_scid = [0; 16];
rand::rand_bytes(&mut client_scid[..]);
let mut server_scid = [0; 16];
rand::rand_bytes(&mut server_scid[..]);
let mut config = Config::new(crate::PROTOCOL_VERSION)?;
config.set_application_protos(b"\x06proto1\x06proto2")?;
config.set_initial_max_data(30);
config.set_initial_max_stream_data_bidi_local(15);
config.set_initial_max_stream_data_bidi_remote(15);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
Ok(Pipe {
client: connect(Some("quic.tech"), &client_scid, &mut config)?,
server: accept(&server_scid, None, server_config)?,
})
}
pub fn handshake(&mut self, buf: &mut [u8]) -> Result<()> {
let mut len = self.client.send(buf)?;
while !self.client.is_established() && !self.server.is_established() {
len = recv_send(&mut self.server, buf, len)?;
len = recv_send(&mut self.client, buf, len)?;
}
recv_send(&mut self.server, buf, len)?;
Ok(())
}
pub fn flush_client(&mut self, buf: &mut [u8]) -> Result<()> {
loop {
let len = match self.client.send(buf) {
Ok(write) => write,
Err(Error::Done) => break,
Err(e) => return Err(e),
};
match self.server.recv(&mut buf[..len]) {
Ok(_) => (),
Err(Error::Done) => (),
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn flush_server(&mut self, buf: &mut [u8]) -> Result<()> {
loop {
let len = match self.server.send(buf) {
Ok(write) => write,
Err(Error::Done) => break,
Err(e) => return Err(e),
};
match self.client.recv(&mut buf[..len]) {
Ok(_) => (),
Err(Error::Done) => (),
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn advance(&mut self, buf: &mut [u8]) -> Result<()> {
let mut client_done = false;
let mut server_done = false;
let mut len = 0;
while !client_done || !server_done {
len = recv_send(&mut self.client, buf, len)?;
client_done = len == 0;
len = recv_send(&mut self.server, buf, len)?;
server_done = len == 0;
}
Ok(())
}
pub fn send_pkt_to_server(
&mut self, pkt_type: packet::Type, frames: &[frame::Frame],
buf: &mut [u8],
) -> Result<usize> {
let written = encode_pkt(&mut self.client, pkt_type, frames, buf)?;
recv_send(&mut self.server, buf, written)
}
}
pub fn recv_send(
conn: &mut Connection, buf: &mut [u8], len: usize,
) -> Result<usize> {
let mut left = len;
while left > 0 {
match conn.recv(&mut buf[len - left..len]) {
Ok(read) => left -= read,
Err(Error::Done) => break,
Err(e) => return Err(e),
}
}
assert_eq!(left, 0);
let mut off = 0;
while off < buf.len() {
match conn.send(&mut buf[off..]) {
Ok(write) => off += write,
Err(Error::Done) => break,
Err(e) => return Err(e),
}
}
Ok(off)
}
pub fn encode_pkt(
conn: &mut Connection, pkt_type: packet::Type, frames: &[frame::Frame],
buf: &mut [u8],
) -> Result<usize> {
let mut b = octets::Octets::with_slice(buf);
let epoch = pkt_type.to_epoch()?;
let space = &mut conn.pkt_num_spaces[epoch];
let pn = space.next_pkt_num;
let pn_len = packet::pkt_num_len(pn)?;
let hdr = Header {
ty: pkt_type,
version: conn.version,
dcid: conn.dcid.clone(),
scid: conn.scid.clone(),
pkt_num: 0,
pkt_num_len: pn_len,
odcid: None,
token: conn.token.clone(),
versions: None,
key_phase: false,
};
hdr.to_bytes(&mut b)?;
let payload_len =
frames.iter().fold(0, |acc, x| acc + x.wire_len()) + space.overhead();
if pkt_type != packet::Type::Application {
let len = pn_len + payload_len;
b.put_varint(len as u64)?;
}
packet::encode_pkt_num(pn, &mut b)?;
let payload_offset = b.off();
for frame in frames {
frame.to_bytes(&mut b)?;
}
let aead = match space.crypto_seal {
Some(ref v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
aead,
)?;
space.next_pkt_num += 1;
Ok(written)
}
pub fn decode_pkt(
conn: &mut Connection, buf: &mut [u8], len: usize,
) -> Result<Vec<frame::Frame>> {
let mut b = octets::Octets::with_slice(&mut buf[..len]);
let mut hdr = Header::from_bytes(&mut b, conn.scid.len()).unwrap();
let epoch = hdr.ty.to_epoch()?;
let aead = conn.pkt_num_spaces[epoch].crypto_open.as_ref().unwrap();
let payload_len = b.cap();
packet::decrypt_hdr(&mut b, &mut hdr, &aead).unwrap();
let pn = packet::decode_pkt_num(
conn.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let mut payload =
packet::decrypt_pkt(&mut b, pn, hdr.pkt_num_len, payload_len, aead)
.unwrap();
let mut frames = Vec::new();
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
frames.push(frame);
}
Ok(frames)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn transport_params() {
let tp = TransportParams {
original_connection_id: None,
idle_timeout: 30,
stateless_reset_token: Some(vec![0xba; 16]),
max_packet_size: 23_421,
initial_max_data: 424_645_563,
initial_max_stream_data_bidi_local: 154_323_123,
initial_max_stream_data_bidi_remote: 6_587_456,
initial_max_stream_data_uni: 2_461_234,
initial_max_streams_bidi: 12_231,
initial_max_streams_uni: 18_473,
ack_delay_exponent: 20,
max_ack_delay: 2_u64.pow(14) - 1,
disable_migration: true,
};
let mut raw_params = [42; 256];
let mut raw_params =
TransportParams::encode(&tp, true, &mut raw_params).unwrap();
assert_eq!(raw_params.len(), 96);
let new_tp = TransportParams::decode(&mut raw_params, false).unwrap();
assert_eq!(new_tp, tp);
}
#[test]
fn unknown_version() {
let mut buf = [0; 65535];
let mut config = Config::new(0xbabababa).unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(&mut buf), Err(Error::UnknownVersion));
}
#[test]
fn version_negotiation() {
let mut buf = [0; 65535];
let mut config = Config::new(0xbabababa).unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
let hdr = packet::Header::from_slice(&mut buf[..len], 0).unwrap();
len = crate::negotiate_version(&hdr.scid, &hdr.dcid, &mut buf).unwrap();
assert_eq!(pipe.client.recv(&mut buf[..len]), Err(Error::Done));
assert_eq!(pipe.handshake(&mut buf), Ok(()));
}
#[test]
fn handshake() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
assert_eq!(
pipe.client.application_proto(),
pipe.server.application_proto()
);
}
#[test]
fn handshake_alpn_mismatch() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.set_application_protos(b"\x06proto3\x06proto4")
.unwrap();
config.verify_peer(false);
let mut pipe = testing::Pipe::with_client_config(&mut config).unwrap();
assert_eq!(pipe.handshake(&mut buf), Err(Error::TlsFail));
assert_eq!(pipe.client.application_proto(), b"");
assert_eq!(pipe.server.application_proto(), b"");
}
#[test]
fn limit_handshake_data() {
let mut buf = [0; 65535];
let mut config = Config::new(PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert-big.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\06proto2")
.unwrap();
let mut pipe = testing::Pipe::with_server_config(&mut config).unwrap();
let client_sent = pipe.client.send(&mut buf).unwrap();
let server_sent =
testing::recv_send(&mut pipe.server, &mut buf, client_sent).unwrap();
assert_eq!(server_sent, (client_sent - 1) * MAX_AMPLIFICATION_FACTOR);
}
#[test]
fn stream() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
assert!(!pipe.server.stream_finished(4));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(4, &mut b), Ok((12, true)));
assert_eq!(&b[..12], b"hello, world");
assert!(pipe.server.stream_finished(4));
}
#[test]
fn flow_control_limit() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn flow_control_update() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
pipe.server.stream_recv(4, &mut buf).unwrap();
pipe.server.stream_recv(8, &mut buf).unwrap();
let frames = [frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 1, false),
}];
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(iter.next(), Some(&frame::Frame::MaxData { max: 46 }));
}
#[test]
fn stream_flow_control_limit() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaaa", 0, true),
}];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn stream_flow_control_update() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaa", 0, false),
}];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
pipe.server.stream_recv(4, &mut buf).unwrap();
let frames = [frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"a", 7, false),
}];
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(
iter.next(),
Some(&frame::Frame::MaxStreamData {
stream_id: 4,
max: 22,
})
);
}
#[test]
fn stream_limit_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 16,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 20,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 24,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 28,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_limit_max_bidi() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [frame::Frame::MaxStreamsBidi { max: 2u64.pow(60) }];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::MaxStreamsBidi {
max: 2u64.pow(60) + 1,
}];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_limit_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 2,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 6,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 10,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 14,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 18,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 22,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::Stream {
stream_id: 26,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_limit_max_uni() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [frame::Frame::MaxStreamsUni { max: 2u64.pow(60) }];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let frames = [frame::Frame::MaxStreamsUni {
max: 2u64.pow(60) + 1,
}];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::StreamLimit),
);
}
#[test]
fn stream_data_overlap() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"bbbbb", 3, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"ccccc", 6, false),
},
];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
assert_eq!(&b[..11], b"aaaaabbbccc");
}
#[test]
fn stream_data_overlap_with_reordering() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"aaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"ccccc", 6, false),
},
frame::Frame::Stream {
stream_id: 0,
data: stream::RangeBuf::from(b"bbbbb", 3, false),
},
];
let pkt_type = packet::Type::Application;
assert!(pipe.send_pkt_to_server(pkt_type, &frames, &mut buf).is_ok());
let mut b = [0; 15];
assert_eq!(pipe.server.stream_recv(0, &mut b), Ok((11, false)));
assert_eq!(&b[..11], b"aaaaabccccc");
}
#[test]
fn reset_stream_flow_control() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [
frame::Frame::Stream {
stream_id: 4,
data: stream::RangeBuf::from(b"aaaaaaaaaaaaaaa", 0, false),
},
frame::Frame::Stream {
stream_id: 8,
data: stream::RangeBuf::from(b"a", 0, false),
},
frame::Frame::ResetStream {
stream_id: 8,
error_code: 0,
final_size: 15,
},
frame::Frame::Stream {
stream_id: 12,
data: stream::RangeBuf::from(b"a", 0, false),
},
];
let pkt_type = packet::Type::Application;
assert_eq!(
pipe.send_pkt_to_server(pkt_type, &frames, &mut buf),
Err(Error::FlowControl),
);
}
#[test]
fn path_challenge() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
let frames = [frame::Frame::PathChallenge {
data: vec![0xba; 8],
}];
let pkt_type = packet::Type::Application;
let len = pipe
.send_pkt_to_server(pkt_type, &frames, &mut buf)
.unwrap();
assert!(len > 0);
let frames =
testing::decode_pkt(&mut pipe.client, &mut buf, len).unwrap();
let mut iter = frames.iter();
iter.next().unwrap();
assert_eq!(
iter.next(),
Some(&frame::Frame::PathResponse {
data: vec![0xba; 8],
})
);
}
#[test]
fn buffer_early_app_frames() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
let mut delayed = (&buf[..len]).to_vec();
testing::recv_send(&mut pipe.server, &mut buf, 0).unwrap();
assert!(pipe.client.is_established());
assert_eq!(pipe.client.streams.iter_mut().len(), 0);
assert_eq!(pipe.client.stream_send(4, b"hello, world", true), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
assert_eq!(pipe.client.streams.iter_mut().len(), 1);
assert!(!pipe.server.is_established());
assert_eq!(pipe.server.streams.iter_mut().len(), 0);
pipe.server.recv(&mut delayed).unwrap();
assert!(pipe.server.is_established());
assert_eq!(pipe.server.streams.iter_mut().len(), 1);
assert_eq!(pipe.client.stats().sent, pipe.server.stats().recv);
}
#[test]
fn buffer_early_app_frames_limit() {
let mut buf = [0; 65535];
let mut config = Config::new(crate::PROTOCOL_VERSION).unwrap();
config
.load_cert_chain_from_pem_file("examples/cert.crt")
.unwrap();
config
.load_priv_key_from_pem_file("examples/cert.key")
.unwrap();
config
.set_application_protos(b"\x06proto1\x06proto2")
.unwrap();
config.set_initial_max_data(256);
config.set_initial_max_stream_data_bidi_local(256);
config.set_initial_max_stream_data_bidi_remote(256);
config.set_initial_max_streams_bidi(13);
config.set_initial_max_streams_uni(13);
config.verify_peer(false);
let mut pipe = testing::Pipe::with_config(&mut config).unwrap();
let mut len = pipe.client.send(&mut buf).unwrap();
len = testing::recv_send(&mut pipe.server, &mut buf, len).unwrap();
len = testing::recv_send(&mut pipe.client, &mut buf, len).unwrap();
let mut delayed = (&buf[..len]).to_vec();
testing::recv_send(&mut pipe.server, &mut buf, 0).unwrap();
assert!(pipe.client.is_established());
assert_eq!(pipe.client.streams.iter_mut().len(), 0);
for i in 1..=recovery::INITIAL_WINDOW_PACKETS + 1 {
pipe.client
.stream_send(i as u64 * 4, b"hello, world", true)
.unwrap();
pipe.advance(&mut buf).unwrap();
}
assert_eq!(pipe.client.streams.iter_mut().len(), 11);
assert!(!pipe.server.is_established());
assert_eq!(pipe.server.streams.iter_mut().len(), 0);
pipe.server.recv(&mut delayed).unwrap();
assert!(pipe.server.is_established());
assert_eq!(pipe.server.streams.iter_mut().len(), 10);
assert_eq!(pipe.client.stats().sent, pipe.server.stats().recv + 1);
}
#[test]
fn stream_shutdown_read() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 0), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
}
#[test]
fn stream_shutdown_write() {
let mut buf = [0; 65535];
let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(&mut buf), Ok(()));
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);
let mut b = [0; 15];
pipe.server.stream_recv(4, &mut b).unwrap();
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.client.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
assert_eq!(pipe.advance(&mut buf), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(&mut buf), Ok(()));
let mut r = pipe.server.readable();
assert_eq!(r.next(), None);
}
}
pub use crate::packet::Header;
pub use crate::packet::Type;
pub use crate::stream::Readable;
mod crypto;
mod ffi;
mod frame;
pub mod h3;
mod octets;
mod packet;
mod rand;
mod ranges;
mod recovery;
mod stream;
mod tls;