#![allow(clippy::upper_case_acronyms)]
#![warn(missing_docs)]
#![warn(unused_qualifications)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#[macro_use]
extern crate log;
use std::cmp;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[cfg(feature = "qlog")]
use qlog::events::quic::DataMovedAdditionalInfo;
#[cfg(feature = "qlog")]
use qlog::events::quic::QuicEventType;
#[cfg(feature = "qlog")]
use qlog::events::quic::TransportInitiator;
#[cfg(feature = "qlog")]
use qlog::events::DataRecipient;
#[cfg(feature = "qlog")]
use qlog::events::Event;
#[cfg(feature = "qlog")]
use qlog::events::EventData;
#[cfg(feature = "qlog")]
use qlog::events::EventImportance;
#[cfg(feature = "qlog")]
use qlog::events::EventType;
#[cfg(feature = "qlog")]
use qlog::events::RawInfo;
use smallvec::SmallVec;
use crate::buffers::DefaultBufFactory;
use crate::recovery::OnAckReceivedOutcome;
use crate::recovery::OnLossDetectionTimeoutOutcome;
use crate::recovery::RecoveryOps;
use crate::recovery::ReleaseDecision;
use crate::stream::RecvAction;
use crate::stream::StreamPriorityKey;
pub const PROTOCOL_VERSION: u32 = PROTOCOL_VERSION_V1;
const PROTOCOL_VERSION_V1: u32 = 0x0000_0001;
pub const MAX_CONN_ID_LEN: usize = packet::MAX_CID_LEN as usize;
pub const MIN_CLIENT_INITIAL_LEN: usize = 1200;
const DEFAULT_INITIAL_RTT: Duration = Duration::from_millis(333);
const PAYLOAD_MIN_LEN: usize = 4;
const MIN_PROBING_SIZE: usize = 25;
const MAX_AMPLIFICATION_FACTOR: usize = 3;
const MAX_ACK_RANGES: usize = 68;
const MAX_STREAM_ID: u64 = 1 << 60;
const MAX_SEND_UDP_PAYLOAD_SIZE: usize = 1200;
const DEFAULT_MAX_DGRAM_QUEUE_LEN: usize = 0;
const DEFAULT_MAX_PATH_CHALLENGE_RX_QUEUE_LEN: usize = 3;
const MAX_DGRAM_FRAME_SIZE: u64 = 65536;
const PAYLOAD_LENGTH_LEN: usize = 2;
const MAX_UNDECRYPTABLE_PACKETS: usize = 10;
const RESERVED_VERSION_MASK: u32 = 0xfafafafa;
const DEFAULT_CONNECTION_WINDOW: u64 = 48 * 1024;
const MAX_CONNECTION_WINDOW: u64 = 24 * 1024 * 1024;
const CONNECTION_WINDOW_FACTOR: f64 = 1.5;
const MAX_PROBING_TIMEOUTS: usize = 3;
const DEFAULT_INITIAL_CONGESTION_WINDOW_PACKETS: usize = 10;
const MAX_CRYPTO_STREAM_OFFSET: u64 = 1 << 16;
const TX_CAP_FACTOR: f64 = 1.0;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RecvInfo {
pub from: SocketAddr,
pub to: SocketAddr,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct SendInfo {
pub from: SocketAddr,
pub to: SocketAddr,
pub at: Instant,
}
#[repr(C)]
#[derive(PartialEq, Eq)]
pub enum Shutdown {
Read = 0,
Write = 1,
}
#[repr(C)]
#[cfg(feature = "qlog")]
#[cfg_attr(docsrs, doc(cfg(feature = "qlog")))]
pub enum QlogLevel {
Core = 0,
Base = 1,
Extra = 2,
}
pub struct Config {
local_transport_params: TransportParams,
version: u32,
tls_ctx: tls::Context,
application_protos: Vec<Vec<u8>>,
grease: bool,
cc_algorithm: CongestionControlAlgorithm,
custom_bbr_params: Option<BbrParams>,
initial_congestion_window_packets: usize,
enable_relaxed_loss_threshold: bool,
enable_cubic_idle_restart_fix: bool,
enable_send_streams_blocked: bool,
pmtud: bool,
pmtud_max_probes: u8,
hystart: bool,
pacing: bool,
max_pacing_rate: Option<u64>,
tx_cap_factor: f64,
dgram_recv_max_queue_len: usize,
dgram_send_max_queue_len: usize,
path_challenge_recv_max_queue_len: usize,
max_send_udp_payload_size: usize,
max_connection_window: u64,
max_stream_window: u64,
max_amplification_factor: usize,
disable_dcid_reuse: bool,
track_unknown_transport_params: Option<usize>,
initial_rtt: Duration,
use_initial_max_data_as_flow_control_win: bool,
}
fn is_reserved_version(version: u32) -> bool {
version & RESERVED_VERSION_MASK == version
}
impl Config {
pub fn new(version: u32) -> Result<Config> {
Self::with_tls_ctx(version, tls::Context::new()?)
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn with_boring_ssl_ctx_builder(
version: u32, tls_ctx_builder: boring::ssl::SslContextBuilder,
) -> Result<Config> {
Self::with_tls_ctx(version, tls::Context::from_boring(tls_ctx_builder))
}
fn with_tls_ctx(version: u32, tls_ctx: tls::Context) -> Result<Config> {
if !is_reserved_version(version) && !version_is_supported(version) {
return Err(Error::UnknownVersion);
}
Ok(Config {
local_transport_params: TransportParams::default(),
version,
tls_ctx,
application_protos: Vec::new(),
grease: true,
cc_algorithm: CongestionControlAlgorithm::CUBIC,
custom_bbr_params: None,
initial_congestion_window_packets:
DEFAULT_INITIAL_CONGESTION_WINDOW_PACKETS,
enable_relaxed_loss_threshold: false,
enable_cubic_idle_restart_fix: true,
enable_send_streams_blocked: false,
pmtud: false,
pmtud_max_probes: pmtud::MAX_PROBES_DEFAULT,
hystart: true,
pacing: true,
max_pacing_rate: None,
tx_cap_factor: TX_CAP_FACTOR,
dgram_recv_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
dgram_send_max_queue_len: DEFAULT_MAX_DGRAM_QUEUE_LEN,
path_challenge_recv_max_queue_len:
DEFAULT_MAX_PATH_CHALLENGE_RX_QUEUE_LEN,
max_send_udp_payload_size: MAX_SEND_UDP_PAYLOAD_SIZE,
max_connection_window: MAX_CONNECTION_WINDOW,
max_stream_window: stream::MAX_STREAM_WINDOW,
max_amplification_factor: MAX_AMPLIFICATION_FACTOR,
disable_dcid_reuse: false,
track_unknown_transport_params: None,
initial_rtt: DEFAULT_INITIAL_RTT,
use_initial_max_data_as_flow_control_win: false,
})
}
pub fn load_cert_chain_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.use_certificate_chain_file(file)
}
pub fn load_priv_key_from_pem_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.use_privkey_file(file)
}
pub fn load_verify_locations_from_file(&mut self, file: &str) -> Result<()> {
self.tls_ctx.load_verify_locations_from_file(file)
}
pub fn load_verify_locations_from_directory(
&mut self, dir: &str,
) -> Result<()> {
self.tls_ctx.load_verify_locations_from_directory(dir)
}
pub fn verify_peer(&mut self, verify: bool) {
self.tls_ctx.set_verify(verify);
}
pub fn discover_pmtu(&mut self, discover: bool) {
self.pmtud = discover;
}
pub fn set_pmtud_max_probes(&mut self, max_probes: u8) {
self.pmtud_max_probes = max_probes;
}
pub fn grease(&mut self, grease: bool) {
self.grease = grease;
}
pub fn log_keys(&mut self) {
self.tls_ctx.enable_keylog();
}
pub fn set_ticket_key(&mut self, key: &[u8]) -> Result<()> {
self.tls_ctx.set_ticket_key(key)
}
pub fn enable_early_data(&mut self) {
self.tls_ctx.set_early_data_enabled(true);
}
pub fn set_application_protos(
&mut self, protos_list: &[&[u8]],
) -> Result<()> {
self.application_protos =
protos_list.iter().map(|s| s.to_vec()).collect();
self.tls_ctx.set_alpn(protos_list)
}
pub fn set_application_protos_wire_format(
&mut self, protos: &[u8],
) -> Result<()> {
let mut b = octets::Octets::with_slice(protos);
let mut protos_list = Vec::new();
while let Ok(proto) = b.get_bytes_with_u8_length() {
protos_list.push(proto.buf());
}
self.set_application_protos(&protos_list)
}
pub fn set_max_amplification_factor(&mut self, v: usize) {
self.max_amplification_factor = v;
}
pub fn set_send_capacity_factor(&mut self, v: f64) {
self.tx_cap_factor = v;
}
pub fn set_initial_rtt(&mut self, v: Duration) {
self.initial_rtt = v;
}
pub fn set_max_idle_timeout(&mut self, v: u64) {
self.local_transport_params.max_idle_timeout =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_max_recv_udp_payload_size(&mut self, v: usize) {
self.local_transport_params.max_udp_payload_size =
cmp::min(v as u64, octets::MAX_VAR_INT);
}
pub fn set_max_send_udp_payload_size(&mut self, v: usize) {
self.max_send_udp_payload_size = cmp::max(v, MAX_SEND_UDP_PAYLOAD_SIZE);
}
pub fn set_initial_max_data(&mut self, v: u64) {
self.local_transport_params.initial_max_data =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_local =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) {
self.local_transport_params
.initial_max_stream_data_bidi_remote =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_initial_max_stream_data_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_stream_data_uni =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_initial_max_streams_bidi(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_bidi =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_initial_max_streams_uni(&mut self, v: u64) {
self.local_transport_params.initial_max_streams_uni =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_ack_delay_exponent(&mut self, v: u64) {
self.local_transport_params.ack_delay_exponent =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_max_ack_delay(&mut self, v: u64) {
self.local_transport_params.max_ack_delay =
cmp::min(v, octets::MAX_VAR_INT);
}
pub fn set_active_connection_id_limit(&mut self, v: u64) {
if v >= 2 {
self.local_transport_params.active_conn_id_limit =
cmp::min(v, octets::MAX_VAR_INT);
}
}
pub fn set_disable_active_migration(&mut self, v: bool) {
self.local_transport_params.disable_active_migration = v;
}
pub fn set_cc_algorithm(&mut self, algo: CongestionControlAlgorithm) {
self.cc_algorithm = algo;
}
#[cfg(feature = "internal")]
#[doc(hidden)]
pub fn set_custom_bbr_params(&mut self, custom_bbr_settings: BbrParams) {
self.custom_bbr_params = Some(custom_bbr_settings);
}
pub fn set_cc_algorithm_name(&mut self, name: &str) -> Result<()> {
self.cc_algorithm = CongestionControlAlgorithm::from_str(name)?;
Ok(())
}
pub fn set_initial_congestion_window_packets(&mut self, packets: usize) {
self.initial_congestion_window_packets = packets;
}
pub fn set_enable_relaxed_loss_threshold(&mut self, enable: bool) {
self.enable_relaxed_loss_threshold = enable;
}
pub fn set_enable_cubic_idle_restart_fix(&mut self, enable: bool) {
self.enable_cubic_idle_restart_fix = enable;
}
pub fn set_enable_send_streams_blocked(&mut self, enable: bool) {
self.enable_send_streams_blocked = enable;
}
pub fn enable_hystart(&mut self, v: bool) {
self.hystart = v;
}
pub fn enable_pacing(&mut self, v: bool) {
self.pacing = v;
}
pub fn set_max_pacing_rate(&mut self, v: u64) {
self.max_pacing_rate = Some(v);
}
pub fn enable_dgram(
&mut self, enabled: bool, recv_queue_len: usize, send_queue_len: usize,
) {
self.local_transport_params.max_datagram_frame_size = if enabled {
Some(MAX_DGRAM_FRAME_SIZE)
} else {
None
};
self.dgram_recv_max_queue_len = recv_queue_len;
self.dgram_send_max_queue_len = send_queue_len;
}
pub fn set_path_challenge_recv_max_queue_len(&mut self, queue_len: usize) {
self.path_challenge_recv_max_queue_len = queue_len;
}
pub fn set_max_connection_window(&mut self, v: u64) {
self.max_connection_window = v;
}
pub fn set_max_stream_window(&mut self, v: u64) {
self.max_stream_window = v;
}
pub fn set_stateless_reset_token(&mut self, v: Option<u128>) {
self.local_transport_params.stateless_reset_token = v;
}
pub fn set_disable_dcid_reuse(&mut self, v: bool) {
self.disable_dcid_reuse = v;
}
pub fn enable_track_unknown_transport_parameters(&mut self, size: usize) {
self.track_unknown_transport_params = Some(size);
}
pub fn set_use_initial_max_data_as_flow_control_win(&mut self, v: bool) {
self.use_initial_max_data_as_flow_control_win = v;
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub enum TxBufferTrackingState {
#[default]
Ok,
Inconsistent,
}
#[derive(Default)]
struct StreamsBlockedState {
blocked_at: Option<u64>,
blocked_sent: Option<u64>,
}
impl StreamsBlockedState {
fn has_pending_stream_blocked_frame(&self) -> bool {
self.blocked_sent < self.blocked_at
}
fn update_at(&mut self, limit: u64) {
self.blocked_at = self.blocked_at.max(Some(limit));
}
fn force_retransmit_sent_limit_eq(&mut self, limit: u64) {
if self.blocked_sent == Some(limit) {
self.blocked_sent = None;
}
}
}
pub struct Connection<F = DefaultBufFactory>
where
F: BufFactory,
{
version: u32,
ids: cid::ConnectionIdentifiers,
trace_id: String,
pkt_num_spaces: [packet::PktNumSpace; packet::Epoch::count()],
crypto_ctx: [packet::CryptoContext; packet::Epoch::count()],
next_pkt_num: u64,
pkt_num_manager: packet::PktNumManager,
peer_transport_params: TransportParams,
peer_transport_params_track_unknown: Option<usize>,
local_transport_params: TransportParams,
handshake: tls::Handshake,
session: Option<Vec<u8>>,
recovery_config: recovery::RecoveryConfig,
paths: path::PathMap,
path_challenge_recv_max_queue_len: usize,
path_challenge_rx_count: u64,
application_protos: Vec<Vec<u8>>,
recv_count: usize,
sent_count: usize,
lost_count: usize,
spurious_lost_count: usize,
retrans_count: usize,
dgram_sent_count: usize,
dgram_recv_count: usize,
rx_data: u64,
flow_control: flowcontrol::FlowControl,
should_send_max_data: bool,
should_send_max_streams_bidi: bool,
should_send_max_streams_uni: bool,
tx_cap: usize,
tx_cap_factor: f64,
tx_buffered: usize,
tx_buffered_state: TxBufferTrackingState,
tx_data: u64,
max_tx_data: u64,
last_tx_data: u64,
stream_retrans_bytes: u64,
sent_bytes: u64,
recv_bytes: u64,
acked_bytes: u64,
lost_bytes: u64,
streams: stream::StreamMap<F>,
odcid: Option<ConnectionId<'static>>,
rscid: Option<ConnectionId<'static>>,
token: Option<Vec<u8>>,
local_error: Option<ConnectionError>,
peer_error: Option<ConnectionError>,
blocked_limit: Option<u64>,
idle_timer: Option<Instant>,
draining_timer: Option<Instant>,
undecryptable_pkts: VecDeque<(Vec<u8>, RecvInfo)>,
alpn: Vec<u8>,
is_server: bool,
derived_initial_secrets: bool,
did_version_negotiation: bool,
did_retry: bool,
got_peer_conn_id: bool,
peer_verified_initial_address: bool,
parsed_peer_transport_params: bool,
handshake_completed: bool,
handshake_done_sent: bool,
handshake_done_acked: bool,
handshake_confirmed: bool,
key_phase: bool,
ack_eliciting_sent: bool,
closed: bool,
timed_out: bool,
grease: bool,
enable_send_streams_blocked: bool,
keylog: Option<Box<dyn std::io::Write + Send + Sync>>,
#[cfg(feature = "qlog")]
qlog: QlogInfo,
dgram_recv_queue: dgram::DatagramQueue<F>,
dgram_send_queue: dgram::DatagramQueue<F>,
emit_dgram: bool,
disable_dcid_reuse: bool,
reset_stream_local_count: u64,
stopped_stream_local_count: u64,
reset_stream_remote_count: u64,
stopped_stream_remote_count: u64,
data_blocked_sent_count: u64,
stream_data_blocked_sent_count: u64,
data_blocked_recv_count: u64,
stream_data_blocked_recv_count: u64,
streams_blocked_bidi_recv_count: u64,
streams_blocked_uni_recv_count: u64,
streams_blocked_bidi_state: StreamsBlockedState,
streams_blocked_uni_state: StreamsBlockedState,
max_amplification_factor: usize,
}
#[inline(always)]
pub fn accept(
scid: &ConnectionId, odcid: Option<&ConnectionId>, local: SocketAddr,
peer: SocketAddr, config: &mut Config,
) -> Result<Connection> {
accept_with_buf_factory(scid, odcid, local, peer, config)
}
#[inline]
pub fn accept_with_buf_factory<F: BufFactory>(
scid: &ConnectionId, odcid: Option<&ConnectionId>, local: SocketAddr,
peer: SocketAddr, config: &mut Config,
) -> Result<Connection<F>> {
let retry_cids = odcid.map(|odcid| RetryConnectionIds {
original_destination_cid: odcid,
retry_source_cid: scid,
});
Connection::new(scid, retry_cids, None, local, peer, config, true)
}
pub struct RetryConnectionIds<'a> {
pub original_destination_cid: &'a ConnectionId<'a>,
pub retry_source_cid: &'a ConnectionId<'a>,
}
#[inline]
pub fn accept_with_retry<F: BufFactory>(
scid: &ConnectionId, retry_cids: RetryConnectionIds, local: SocketAddr,
peer: SocketAddr, config: &mut Config,
) -> Result<Connection<F>> {
Connection::new(scid, Some(retry_cids), None, local, peer, config, true)
}
#[inline]
pub fn connect(
server_name: Option<&str>, scid: &ConnectionId, local: SocketAddr,
peer: SocketAddr, config: &mut Config,
) -> Result<Connection> {
let mut conn = Connection::new(scid, None, None, local, peer, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.set_host_name(server_name)?;
}
Ok(conn)
}
#[cfg(feature = "custom-client-dcid")]
#[cfg_attr(docsrs, doc(cfg(feature = "custom-client-dcid")))]
pub fn connect_with_dcid(
server_name: Option<&str>, scid: &ConnectionId, dcid: &ConnectionId,
local: SocketAddr, peer: SocketAddr, config: &mut Config,
) -> Result<Connection> {
let mut conn =
Connection::new(scid, None, Some(dcid), local, peer, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.set_host_name(server_name)?;
}
Ok(conn)
}
#[inline]
pub fn connect_with_buffer_factory<F: BufFactory>(
server_name: Option<&str>, scid: &ConnectionId, local: SocketAddr,
peer: SocketAddr, config: &mut Config,
) -> Result<Connection<F>> {
let mut conn = Connection::new(scid, None, None, local, peer, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.set_host_name(server_name)?;
}
Ok(conn)
}
#[cfg(feature = "custom-client-dcid")]
#[cfg_attr(docsrs, doc(cfg(feature = "custom-client-dcid")))]
pub fn connect_with_dcid_and_buffer_factory<F: BufFactory>(
server_name: Option<&str>, scid: &ConnectionId, dcid: &ConnectionId,
local: SocketAddr, peer: SocketAddr, config: &mut Config,
) -> Result<Connection<F>> {
let mut conn =
Connection::new(scid, None, Some(dcid), local, peer, config, false)?;
if let Some(server_name) = server_name {
conn.handshake.set_host_name(server_name)?;
}
Ok(conn)
}
#[inline]
pub fn negotiate_version(
scid: &ConnectionId, dcid: &ConnectionId, out: &mut [u8],
) -> Result<usize> {
packet::negotiate_version(scid, dcid, out)
}
#[inline]
pub fn retry(
scid: &ConnectionId, dcid: &ConnectionId, new_scid: &ConnectionId,
token: &[u8], version: u32, out: &mut [u8],
) -> Result<usize> {
packet::retry(scid, dcid, new_scid, token, version, out)
}
#[inline]
pub fn version_is_supported(version: u32) -> bool {
matches!(version, PROTOCOL_VERSION_V1)
}
macro_rules! push_frame_to_pkt {
($out:expr, $frames:expr, $frame:expr, $left:expr) => {{
if $frame.wire_len() <= $left {
$left -= $frame.wire_len();
$frame.to_bytes(&mut $out)?;
$frames.push($frame);
true
} else {
false
}
}};
}
macro_rules! qlog_with_type {
($ty:expr, $qlog:expr, $qlog_streamer_ref:ident, $body:block) => {{
#[cfg(feature = "qlog")]
{
if EventImportance::from($ty).is_contained_in(&$qlog.level) {
if let Some($qlog_streamer_ref) = &mut $qlog.streamer {
$body
}
}
}
}};
}
#[cfg(feature = "qlog")]
const QLOG_PARAMS_SET: EventType =
EventType::QuicEventType(QuicEventType::ParametersSet);
#[cfg(feature = "qlog")]
const QLOG_PACKET_RX: EventType =
EventType::QuicEventType(QuicEventType::PacketReceived);
#[cfg(feature = "qlog")]
const QLOG_PACKET_TX: EventType =
EventType::QuicEventType(QuicEventType::PacketSent);
#[cfg(feature = "qlog")]
const QLOG_DATA_MV: EventType =
EventType::QuicEventType(QuicEventType::StreamDataMoved);
#[cfg(feature = "qlog")]
const QLOG_METRICS: EventType =
EventType::QuicEventType(QuicEventType::RecoveryMetricsUpdated);
#[cfg(feature = "qlog")]
const QLOG_CONNECTION_CLOSED: EventType =
EventType::QuicEventType(QuicEventType::ConnectionClosed);
#[cfg(feature = "qlog")]
struct QlogInfo {
streamer: Option<qlog::streamer::QlogStreamer>,
logged_peer_params: bool,
level: EventImportance,
}
#[cfg(feature = "qlog")]
impl Default for QlogInfo {
fn default() -> Self {
QlogInfo {
streamer: None,
logged_peer_params: false,
level: EventImportance::Base,
}
}
}
impl<F: BufFactory> Connection<F> {
fn new(
scid: &ConnectionId, retry_cids: Option<RetryConnectionIds>,
client_dcid: Option<&ConnectionId>, local: SocketAddr, peer: SocketAddr,
config: &mut Config, is_server: bool,
) -> Result<Connection<F>> {
let tls = config.tls_ctx.new_handshake()?;
Connection::with_tls(
scid,
retry_cids,
client_dcid,
local,
peer,
config,
tls,
is_server,
)
}
#[allow(clippy::too_many_arguments)]
fn with_tls(
scid: &ConnectionId, retry_cids: Option<RetryConnectionIds>,
client_dcid: Option<&ConnectionId>, local: SocketAddr, peer: SocketAddr,
config: &Config, tls: tls::Handshake, is_server: bool,
) -> Result<Connection<F>> {
if retry_cids.is_some() && client_dcid.is_some() {
return Err(Error::InvalidDcidInitialization);
}
#[cfg(feature = "custom-client-dcid")]
if let Some(client_dcid) = client_dcid {
if client_dcid.to_vec().len() < 8 {
return Err(Error::InvalidDcidInitialization);
}
}
#[cfg(not(feature = "custom-client-dcid"))]
if client_dcid.is_some() {
return Err(Error::InvalidDcidInitialization);
}
let max_rx_data = config.local_transport_params.initial_max_data;
let scid_as_hex: Vec<String> =
scid.iter().map(|b| format!("{b:02x}")).collect();
let reset_token = if is_server {
config.local_transport_params.stateless_reset_token
} else {
None
};
let recovery_config = recovery::RecoveryConfig::from_config(config);
let mut path = path::Path::new(
local,
peer,
&recovery_config,
config.path_challenge_recv_max_queue_len,
true,
Some(config),
);
path.verified_peer_address = retry_cids.is_some();
path.peer_verified_local_address = is_server;
let paths = path::PathMap::new(
path,
config.local_transport_params.active_conn_id_limit as usize,
is_server,
);
let active_path_id = paths.get_active_path_id()?;
let ids = cid::ConnectionIdentifiers::new(
config.local_transport_params.active_conn_id_limit as usize,
scid,
active_path_id,
reset_token,
);
let initial_flow_control_window =
if config.use_initial_max_data_as_flow_control_win {
max_rx_data
} else {
cmp::min(max_rx_data / 2 * 3, DEFAULT_CONNECTION_WINDOW)
};
let mut conn = Connection {
version: config.version,
ids,
trace_id: scid_as_hex.join(""),
pkt_num_spaces: [
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
packet::PktNumSpace::new(),
],
crypto_ctx: [
packet::CryptoContext::new(),
packet::CryptoContext::new(),
packet::CryptoContext::new(),
],
next_pkt_num: 0,
pkt_num_manager: packet::PktNumManager::new(),
peer_transport_params: TransportParams::default(),
peer_transport_params_track_unknown: config
.track_unknown_transport_params,
local_transport_params: config.local_transport_params.clone(),
handshake: tls,
session: None,
recovery_config,
paths,
path_challenge_recv_max_queue_len: config
.path_challenge_recv_max_queue_len,
path_challenge_rx_count: 0,
application_protos: config.application_protos.clone(),
recv_count: 0,
sent_count: 0,
lost_count: 0,
spurious_lost_count: 0,
retrans_count: 0,
dgram_sent_count: 0,
dgram_recv_count: 0,
sent_bytes: 0,
recv_bytes: 0,
acked_bytes: 0,
lost_bytes: 0,
rx_data: 0,
flow_control: flowcontrol::FlowControl::new(
max_rx_data,
initial_flow_control_window,
config.max_connection_window,
),
should_send_max_data: false,
should_send_max_streams_bidi: false,
should_send_max_streams_uni: false,
tx_cap: 0,
tx_cap_factor: config.tx_cap_factor,
tx_buffered: 0,
tx_buffered_state: TxBufferTrackingState::Ok,
tx_data: 0,
max_tx_data: 0,
last_tx_data: 0,
stream_retrans_bytes: 0,
streams: stream::StreamMap::new(
config.local_transport_params.initial_max_streams_bidi,
config.local_transport_params.initial_max_streams_uni,
config.max_stream_window,
),
odcid: None,
rscid: None,
token: None,
local_error: None,
peer_error: None,
blocked_limit: None,
idle_timer: None,
draining_timer: None,
undecryptable_pkts: VecDeque::new(),
alpn: Vec::new(),
is_server,
derived_initial_secrets: false,
did_version_negotiation: false,
did_retry: false,
got_peer_conn_id: false,
peer_verified_initial_address: is_server,
parsed_peer_transport_params: false,
handshake_completed: false,
handshake_done_sent: false,
handshake_done_acked: false,
handshake_confirmed: false,
key_phase: false,
ack_eliciting_sent: false,
closed: false,
timed_out: false,
grease: config.grease,
enable_send_streams_blocked: config.enable_send_streams_blocked,
keylog: None,
#[cfg(feature = "qlog")]
qlog: Default::default(),
dgram_recv_queue: dgram::DatagramQueue::new(
config.dgram_recv_max_queue_len,
),
dgram_send_queue: dgram::DatagramQueue::new(
config.dgram_send_max_queue_len,
),
emit_dgram: true,
disable_dcid_reuse: config.disable_dcid_reuse,
reset_stream_local_count: 0,
stopped_stream_local_count: 0,
reset_stream_remote_count: 0,
stopped_stream_remote_count: 0,
data_blocked_sent_count: 0,
stream_data_blocked_sent_count: 0,
data_blocked_recv_count: 0,
stream_data_blocked_recv_count: 0,
streams_blocked_bidi_recv_count: 0,
streams_blocked_uni_recv_count: 0,
streams_blocked_bidi_state: Default::default(),
streams_blocked_uni_state: Default::default(),
max_amplification_factor: config.max_amplification_factor,
};
conn.streams.set_use_initial_max_data_as_flow_control_win(
config.use_initial_max_data_as_flow_control_win,
);
if let Some(retry_cids) = retry_cids {
conn.local_transport_params
.original_destination_connection_id =
Some(retry_cids.original_destination_cid.to_vec().into());
conn.local_transport_params.retry_source_connection_id =
Some(retry_cids.retry_source_cid.to_vec().into());
conn.did_retry = true;
}
conn.local_transport_params.initial_source_connection_id =
Some(conn.ids.get_scid(0)?.cid.to_vec().into());
conn.handshake.init(is_server)?;
conn.handshake
.use_legacy_codepoint(config.version != PROTOCOL_VERSION_V1);
conn.encode_transport_params()?;
if !is_server {
let dcid = if let Some(client_dcid) = client_dcid {
client_dcid.to_vec()
} else {
let mut dcid = [0; 16];
rand::rand_bytes(&mut dcid[..]);
dcid.to_vec()
};
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&dcid,
conn.version,
conn.is_server,
false,
)?;
let reset_token = conn.peer_transport_params.stateless_reset_token;
conn.set_initial_dcid(
dcid.to_vec().into(),
reset_token,
active_path_id,
)?;
conn.crypto_ctx[packet::Epoch::Initial].crypto_open = Some(aead_open);
conn.crypto_ctx[packet::Epoch::Initial].crypto_seal = Some(aead_seal);
conn.derived_initial_secrets = true;
}
Ok(conn)
}
#[inline]
pub fn set_keylog(&mut self, writer: Box<dyn std::io::Write + Send + Sync>) {
self.keylog = Some(writer);
}
#[cfg(feature = "qlog")]
#[cfg_attr(docsrs, doc(cfg(feature = "qlog")))]
pub fn set_qlog(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String,
) {
self.set_qlog_with_level(writer, title, description, QlogLevel::Base)
}
#[cfg(feature = "qlog")]
#[cfg_attr(docsrs, doc(cfg(feature = "qlog")))]
pub fn set_qlog_with_level(
&mut self, writer: Box<dyn std::io::Write + Send + Sync>, title: String,
description: String, qlog_level: QlogLevel,
) {
use qlog::events::quic::TransportInitiator;
use qlog::events::HTTP3_URI;
use qlog::events::QUIC_URI;
use qlog::CommonFields;
use qlog::ReferenceTime;
let vp = if self.is_server {
qlog::VantagePointType::Server
} else {
qlog::VantagePointType::Client
};
let level = match qlog_level {
QlogLevel::Core => EventImportance::Core,
QlogLevel::Base => EventImportance::Base,
QlogLevel::Extra => EventImportance::Extra,
};
self.qlog.level = level;
let now = Instant::now();
let now_wall_clock = std::time::SystemTime::now();
let common_fields = CommonFields {
reference_time: ReferenceTime::new_monotonic(Some(now_wall_clock)),
..Default::default()
};
let trace = qlog::TraceSeq::new(
Some(title.to_string()),
Some(description.to_string()),
Some(common_fields),
Some(qlog::VantagePoint {
name: None,
ty: vp,
flow: None,
}),
vec![QUIC_URI.to_string(), HTTP3_URI.to_string()],
);
let mut streamer = qlog::streamer::QlogStreamer::new(
Some(title),
Some(description),
now,
trace,
self.qlog.level,
writer,
);
streamer.start_log().ok();
let ev_data = self
.local_transport_params
.to_qlog(TransportInitiator::Local, self.handshake.cipher());
streamer.add_event(Event::with_time(0.0, ev_data)).ok();
self.qlog.streamer = Some(streamer);
}
#[cfg(feature = "qlog")]
#[cfg_attr(docsrs, doc(cfg(feature = "qlog")))]
pub fn qlog_streamer(&mut self) -> Option<&mut qlog::streamer::QlogStreamer> {
self.qlog.streamer.as_mut()
}
#[inline]
pub fn set_session(&mut self, session: &[u8]) -> Result<()> {
let mut b = octets::Octets::with_slice(session);
let session_len = b.get_u64()? as usize;
let session_bytes = b.get_bytes(session_len)?;
self.handshake.set_session(session_bytes.as_ref())?;
let raw_params_len = b.get_u64()? as usize;
let raw_params_bytes = b.get_bytes(raw_params_len)?;
let peer_params = TransportParams::decode(
raw_params_bytes.as_ref(),
self.is_server,
self.peer_transport_params_track_unknown,
)?;
self.process_peer_transport_params(peer_params)?;
Ok(())
}
pub fn set_max_idle_timeout(&mut self, v: u64) -> Result<()> {
self.local_transport_params.max_idle_timeout =
cmp::min(v, octets::MAX_VAR_INT);
self.encode_transport_params()
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_cc_algorithm_in_handshake(
ssl: &mut boring::ssl::SslRef, algo: CongestionControlAlgorithm,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.cc_algorithm = algo;
Ok(())
}
#[cfg(all(feature = "boringssl-boring-crate", feature = "internal"))]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
#[doc(hidden)]
pub fn set_custom_bbr_settings_in_handshake(
ssl: &mut boring::ssl::SslRef, custom_bbr_params: BbrParams,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.custom_bbr_params = Some(custom_bbr_params);
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_cc_algorithm_name_in_handshake(
ssl: &mut boring::ssl::SslRef, name: &str,
) -> Result<()> {
let cc_algo = CongestionControlAlgorithm::from_str(name)?;
Self::set_cc_algorithm_in_handshake(ssl, cc_algo)
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_initial_congestion_window_packets_in_handshake(
ssl: &mut boring::ssl::SslRef, packets: usize,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.initial_congestion_window_packets = packets;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_enable_relaxed_loss_threshold_in_handshake(
ssl: &mut boring::ssl::SslRef, enable: bool,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.enable_relaxed_loss_threshold = enable;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_enable_cubic_idle_restart_fix_in_handshake(
ssl: &mut boring::ssl::SslRef, enable: bool,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.enable_cubic_idle_restart_fix = enable;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_hystart_in_handshake(
ssl: &mut boring::ssl::SslRef, v: bool,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.hystart = v;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_pacing_in_handshake(
ssl: &mut boring::ssl::SslRef, v: bool,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.pacing = v;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_max_pacing_rate_in_handshake(
ssl: &mut boring::ssl::SslRef, v: Option<u64>,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.max_pacing_rate = v;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_max_send_udp_payload_size_in_handshake(
ssl: &mut boring::ssl::SslRef, v: usize,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.recovery_config.max_send_udp_payload_size = v;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_send_capacity_factor_in_handshake(
ssl: &mut boring::ssl::SslRef, v: f64,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.tx_cap_factor = v;
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_discover_pmtu_in_handshake(
ssl: &mut boring::ssl::SslRef, discover: bool, max_probes: u8,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.pmtud = Some((discover, max_probes));
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_max_idle_timeout_in_handshake(
ssl: &mut boring::ssl::SslRef, v: u64,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.local_transport_params.max_idle_timeout = v;
Self::set_transport_parameters_in_hanshake(
ex_data.local_transport_params.clone(),
ex_data.is_server,
ssl,
)
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_initial_max_streams_bidi_in_handshake(
ssl: &mut boring::ssl::SslRef, v: u64,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.local_transport_params.initial_max_streams_bidi = v;
Self::set_transport_parameters_in_hanshake(
ex_data.local_transport_params.clone(),
ex_data.is_server,
ssl,
)
}
#[cfg(feature = "boringssl-boring-crate")]
fn set_transport_parameters_in_hanshake(
params: TransportParams, is_server: bool, ssl: &mut boring::ssl::SslRef,
) -> Result<()> {
use foreign_types_shared::ForeignTypeRef;
let mut handshake =
unsafe { tls::Handshake::from_ptr(ssl.as_ptr() as _) };
handshake.set_quic_transport_params(¶ms, is_server)?;
std::mem::forget(handshake);
Ok(())
}
#[cfg(feature = "boringssl-boring-crate")]
#[cfg_attr(docsrs, doc(cfg(feature = "boringssl-boring-crate")))]
pub fn set_use_initial_max_data_as_flow_control_win_in_handshake(
ssl: &mut boring::ssl::SslRef,
) -> Result<()> {
let ex_data = tls::ExData::from_ssl_ref(ssl).ok_or(Error::TlsFail)?;
ex_data.use_initial_max_data_as_flow_control_win = true;
Ok(())
}
pub fn recv(&mut self, buf: &mut [u8], info: RecvInfo) -> Result<usize> {
let len = buf.len();
if len == 0 {
return Err(Error::BufferTooShort);
}
let recv_pid = self.paths.path_id_from_addrs(&(info.to, info.from));
if let Some(recv_pid) = recv_pid {
let recv_path = self.paths.get_mut(recv_pid)?;
if self.is_server && !recv_path.verified_peer_address {
recv_path.max_send_bytes += len * self.max_amplification_factor;
}
} else if !self.is_server {
trace!(
"{} client received packet from unknown address {:?}, dropping",
self.trace_id,
info,
);
return Ok(len);
}
let mut done = 0;
let mut left = len;
while left > 0 {
let read = match self.recv_single(
&mut buf[len - left..len],
&info,
recv_pid,
) {
Ok(v) => v,
Err(Error::Done) => {
if self.is_stateless_reset(&buf[len - left..len]) {
trace!("{} packet is a stateless reset", self.trace_id);
self.mark_closed();
}
left
},
Err(e) => {
self.close(false, e.to_wire(), b"").ok();
return Err(e);
},
};
done += read;
left -= read;
}
self.process_undecrypted_0rtt_packets()?;
Ok(done)
}
fn process_undecrypted_0rtt_packets(&mut self) -> Result<()> {
if self.crypto_ctx[packet::Epoch::Application]
.crypto_0rtt_open
.is_some()
{
while let Some((mut pkt, info)) = self.undecryptable_pkts.pop_front()
{
if let Err(e) = self.recv(&mut pkt, info) {
self.undecryptable_pkts.clear();
return Err(e);
}
}
}
Ok(())
}
fn is_stateless_reset(&self, buf: &[u8]) -> bool {
let buf_len = buf.len();
if buf_len < 21 {
return false;
}
match self.peer_transport_params.stateless_reset_token {
Some(token) => {
let token_len = 16;
crypto::verify_slices_are_equal(
&token.to_be_bytes(),
&buf[buf_len - token_len..buf_len],
)
.is_ok()
},
None => false,
}
}
fn recv_single(
&mut self, buf: &mut [u8], info: &RecvInfo, recv_pid: Option<usize>,
) -> Result<usize> {
let now = Instant::now();
if buf.is_empty() {
return Err(Error::Done);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
if is_closing {
return Err(Error::Done);
}
let buf_len = buf.len();
let mut b = octets::OctetsMut::with_slice(buf);
let mut hdr = Header::from_bytes(&mut b, self.source_id().len())
.map_err(|e| {
drop_pkt_on_err(
e,
self.recv_count,
self.is_server,
&self.trace_id,
)
})?;
if hdr.ty == Type::VersionNegotiation {
if self.is_server {
return Err(Error::Done);
}
if self.did_version_negotiation {
return Err(Error::Done);
}
if self.recv_count > 0 {
return Err(Error::Done);
}
if hdr.dcid != self.source_id() {
return Err(Error::Done);
}
if hdr.scid != self.destination_id() {
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
let versions = hdr.versions.ok_or(Error::Done)?;
if versions.contains(&self.version) {
return Err(Error::Done);
}
let supported_versions =
versions.iter().filter(|&&v| version_is_supported(v));
let mut found_version = false;
for &v in supported_versions {
found_version = true;
if v == PROTOCOL_VERSION_V1 {
self.version = v;
break;
}
self.version = cmp::max(self.version, v);
}
if !found_version {
return Err(Error::UnknownVersion);
}
self.did_version_negotiation = true;
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&self.destination_id(),
self.version,
self.is_server,
true,
)?;
self.drop_epoch_state(packet::Epoch::Initial, now);
self.got_peer_conn_id = false;
self.handshake.clear()?;
self.crypto_ctx[packet::Epoch::Initial].crypto_open = Some(aead_open);
self.crypto_ctx[packet::Epoch::Initial].crypto_seal = Some(aead_seal);
self.handshake
.use_legacy_codepoint(self.version != PROTOCOL_VERSION_V1);
self.encode_transport_params()?;
return Err(Error::Done);
}
if hdr.ty == Type::Retry {
if self.is_server {
return Err(Error::Done);
}
if self.did_retry {
return Err(Error::Done);
}
if packet::verify_retry_integrity(
&b,
&self.destination_id(),
self.version,
)
.is_err()
{
return Err(Error::Done);
}
trace!("{} rx pkt {:?}", self.trace_id, hdr);
self.token = hdr.token;
self.did_retry = true;
self.odcid = Some(self.destination_id().into_owned());
self.set_initial_dcid(
hdr.scid.clone(),
None,
self.paths.get_active_path_id()?,
)?;
self.rscid = Some(self.destination_id().into_owned());
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.scid,
self.version,
self.is_server,
true,
)?;
self.drop_epoch_state(packet::Epoch::Initial, now);
self.got_peer_conn_id = false;
self.handshake.clear()?;
self.crypto_ctx[packet::Epoch::Initial].crypto_open = Some(aead_open);
self.crypto_ctx[packet::Epoch::Initial].crypto_seal = Some(aead_seal);
return Err(Error::Done);
}
if self.is_server && !self.did_version_negotiation {
if !version_is_supported(hdr.version) {
return Err(Error::UnknownVersion);
}
self.version = hdr.version;
self.did_version_negotiation = true;
self.handshake
.use_legacy_codepoint(self.version != PROTOCOL_VERSION_V1);
self.encode_transport_params()?;
}
if hdr.ty != Type::Short && hdr.version != self.version {
return Err(Error::Done);
}
let payload_len = if hdr.ty == Type::Short {
b.cap()
} else {
b.get_varint().map_err(|e| {
drop_pkt_on_err(
e.into(),
self.recv_count,
self.is_server,
&self.trace_id,
)
})? as usize
};
if payload_len > b.cap() {
return Err(drop_pkt_on_err(
Error::InvalidPacket,
self.recv_count,
self.is_server,
&self.trace_id,
));
}
if !self.derived_initial_secrets {
let (aead_open, aead_seal) = crypto::derive_initial_key_material(
&hdr.dcid,
self.version,
self.is_server,
false,
)?;
self.crypto_ctx[packet::Epoch::Initial].crypto_open = Some(aead_open);
self.crypto_ctx[packet::Epoch::Initial].crypto_seal = Some(aead_seal);
self.derived_initial_secrets = true;
}
let epoch = hdr.ty.to_epoch()?;
let aead = if hdr.ty == Type::ZeroRTT {
self.crypto_ctx[epoch].crypto_0rtt_open.as_ref()
} else {
self.crypto_ctx[epoch].crypto_open.as_ref()
};
let mut aead = match aead {
Some(v) => v,
None => {
if hdr.ty == Type::ZeroRTT &&
self.undecryptable_pkts.len() < MAX_UNDECRYPTABLE_PACKETS &&
!self.is_established()
{
let pkt_len = b.off() + payload_len;
let pkt = (b.buf()[..pkt_len]).to_vec();
self.undecryptable_pkts.push_back((pkt, *info));
return Ok(pkt_len);
}
let e = drop_pkt_on_err(
Error::CryptoFail,
self.recv_count,
self.is_server,
&self.trace_id,
);
return Err(e);
},
};
let aead_tag_len = aead.alg().tag_len();
packet::decrypt_hdr(&mut b, &mut hdr, aead).map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
let pn = packet::decode_pkt_num(
self.pkt_num_spaces[epoch].largest_rx_pkt_num,
hdr.pkt_num,
hdr.pkt_num_len,
);
let pn_len = hdr.pkt_num_len;
trace!(
"{} rx pkt {:?} len={} pn={} {}",
self.trace_id,
hdr,
payload_len,
pn,
AddrTupleFmt(info.from, info.to)
);
#[cfg(feature = "qlog")]
let mut qlog_frames = vec![];
let mut aead_next = None;
if self.handshake_confirmed &&
hdr.ty != Type::ZeroRTT &&
hdr.key_phase != self.key_phase
{
if let Some(key_update) = self.crypto_ctx[epoch]
.key_update
.as_ref()
.and_then(|key_update| {
(pn < key_update.pn_on_update).then_some(key_update)
})
{
aead = &key_update.crypto_open;
} else {
trace!("{} peer-initiated key update", self.trace_id);
aead_next = Some((
self.crypto_ctx[epoch]
.crypto_open
.as_ref()
.unwrap()
.derive_next_packet_key()?,
self.crypto_ctx[epoch]
.crypto_seal
.as_ref()
.unwrap()
.derive_next_packet_key()?,
));
aead = &aead_next.as_ref().unwrap().0;
}
}
let mut payload = packet::decrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
aead,
)
.map_err(|e| {
drop_pkt_on_err(e, self.recv_count, self.is_server, &self.trace_id)
})?;
if self.pkt_num_spaces[epoch].recv_pkt_num.contains(pn) {
trace!("{} ignored duplicate packet {}", self.trace_id, pn);
return Err(Error::Done);
}
if payload.cap() == 0 {
return Err(Error::InvalidPacket);
}
let recv_pid = if hdr.ty == Type::Short && self.got_peer_conn_id {
let pkt_dcid = ConnectionId::from_ref(&hdr.dcid);
self.get_or_create_recv_path_id(recv_pid, &pkt_dcid, buf_len, info)?
} else {
self.paths.get_active_path_id()?
};
if let Some((open_next, seal_next)) = aead_next {
if !self.crypto_ctx[epoch]
.key_update
.as_ref()
.is_none_or(|prev| prev.update_acked)
{
return Err(Error::KeyUpdate);
}
trace!("{} key update verified", self.trace_id);
let _ = self.crypto_ctx[epoch].crypto_seal.replace(seal_next);
let open_prev = self.crypto_ctx[epoch]
.crypto_open
.replace(open_next)
.unwrap();
let recv_path = self.paths.get_mut(recv_pid)?;
self.crypto_ctx[epoch].key_update = Some(packet::KeyUpdate {
crypto_open: open_prev,
pn_on_update: pn,
update_acked: false,
timer: now + (recv_path.recovery.pto() * 3),
});
self.key_phase = !self.key_phase;
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
let trigger = Some(
qlog::events::quic::KeyUpdateOrRetiredTrigger::RemoteUpdate,
);
let ev_data_client =
EventData::QuicKeyUpdated(qlog::events::quic::KeyUpdated {
key_type: qlog::events::quic::KeyType::Client1RttSecret,
trigger: trigger.clone(),
..Default::default()
});
q.add_event_data_with_instant(ev_data_client, now).ok();
let ev_data_server =
EventData::QuicKeyUpdated(qlog::events::quic::KeyUpdated {
key_type: qlog::events::quic::KeyType::Server1RttSecret,
trigger,
..Default::default()
});
q.add_event_data_with_instant(ev_data_server, now).ok();
});
}
if !self.is_server && !self.got_peer_conn_id {
if self.odcid.is_none() {
self.odcid = Some(self.destination_id().into_owned());
}
self.set_initial_dcid(
hdr.scid.clone(),
self.peer_transport_params.stateless_reset_token,
recv_pid,
)?;
self.got_peer_conn_id = true;
}
if self.is_server && !self.got_peer_conn_id {
self.set_initial_dcid(hdr.scid.clone(), None, recv_pid)?;
if !self.did_retry {
self.local_transport_params
.original_destination_connection_id =
Some(hdr.dcid.to_vec().into());
self.encode_transport_params()?;
}
self.got_peer_conn_id = true;
}
let mut ack_elicited = false;
let mut frame_processing_err = None;
let mut probing = true;
while payload.cap() > 0 {
let frame = frame::Frame::from_bytes(&mut payload, hdr.ty)?;
qlog_with_type!(QLOG_PACKET_RX, self.qlog, _q, {
qlog_frames.push(frame.to_qlog());
});
if frame.ack_eliciting() {
ack_elicited = true;
}
if !frame.probing() {
probing = false;
}
if let Err(e) = self.process_frame(frame, &hdr, recv_pid, epoch, now)
{
frame_processing_err = Some(e);
break;
}
}
qlog_with_type!(QLOG_PACKET_RX, self.qlog, q, {
let packet_size = b.len();
let qlog_pkt_hdr = qlog::events::quic::PacketHeader::with_type(
hdr.ty.to_qlog(),
Some(pn),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
);
let qlog_raw_info = RawInfo {
length: Some(packet_size as u64),
payload_length: Some(payload_len as u64),
data: None,
};
let ev_data = EventData::QuicPacketReceived(
qlog::events::quic::PacketReceived {
header: qlog_pkt_hdr,
frames: Some(qlog_frames),
raw: Some(qlog_raw_info),
..Default::default()
},
);
q.add_event_data_with_instant(ev_data, now).ok();
});
qlog_with_type!(QLOG_METRICS, self.qlog, q, {
let recv_path = self.paths.get_mut(recv_pid)?;
recv_path.recovery.maybe_qlog(q, now);
});
if let Some(e) = frame_processing_err {
return Err(e);
}
if self.is_established() {
qlog_with_type!(QLOG_PARAMS_SET, self.qlog, q, {
if !self.qlog.logged_peer_params {
let ev_data = self.peer_transport_params.to_qlog(
TransportInitiator::Remote,
self.handshake.cipher(),
);
q.add_event_data_with_instant(ev_data, now).ok();
self.qlog.logged_peer_params = true;
}
});
}
for (_, p) in self.paths.iter_mut() {
while let Some(acked) = p.recovery.next_acked_frame(epoch) {
match acked {
frame::Frame::Ping {
mtu_probe: Some(mtu_probe),
} => {
if let Some(pmtud) = p.pmtud.as_mut() {
trace!(
"{} pmtud probe acked; probe size {:?}",
self.trace_id,
mtu_probe
);
if let Some(current_mtu) =
pmtud.successful_probe(mtu_probe)
{
qlog_with_type!(
EventType::QuicEventType(
QuicEventType::MtuUpdated
),
self.qlog,
q,
{
let pmtu_data = EventData::QuicMtuUpdated(
qlog::events::quic::MtuUpdated {
old: Some(
p.recovery.max_datagram_size()
as u32,
),
new: current_mtu as u32,
done: Some(true),
},
);
q.add_event_data_with_instant(
pmtu_data, now,
)
.ok();
}
);
p.recovery
.pmtud_update_max_datagram_size(current_mtu);
}
}
},
frame::Frame::ACK { ranges, .. } => {
if let Some(largest_acked) = ranges.last() {
self.pkt_num_spaces[epoch]
.recv_pkt_need_ack
.remove_until(largest_acked);
}
},
frame::Frame::CryptoHeader { offset, length } => {
self.crypto_ctx[epoch]
.crypto_stream
.send
.ack_and_drop(offset, length);
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
..
} => {
self.tx_buffered =
self.tx_buffered.saturating_sub(length);
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(offset),
raw: Some(RawInfo {
length: Some(length as u64),
..Default::default()
}),
from: Some(DataRecipient::Transport),
to: Some(DataRecipient::Dropped),
..Default::default()
},
);
q.add_event_data_with_instant(ev_data, now).ok();
});
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
stream.send.ack_and_drop(offset, length);
let priority_key = Arc::clone(&stream.priority_key);
let is_writable = priority_key.writable.is_linked() &&
stream.send.is_stopped();
let is_complete = stream.is_complete();
let is_readable = stream.is_readable();
if is_complete && !is_readable && !is_writable {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
frame::Frame::HandshakeDone => {
self.handshake_done_sent = true;
self.handshake_done_acked = true;
},
frame::Frame::ResetStream { stream_id, .. } => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => continue,
};
let priority_key = Arc::clone(&stream.priority_key);
let is_writable = priority_key.writable.is_linked() &&
stream.send.is_stopped();
let is_complete = stream.is_complete();
let is_readable = stream.is_readable();
if is_complete && !is_readable && !is_writable {
let local = stream.local;
self.streams.collect(stream_id, local);
}
},
_ => (),
}
}
}
let no_dcid = self
.paths
.iter_mut()
.filter(|(_, p)| p.active_dcid_seq.is_none());
for (pid, p) in no_dcid {
if self.ids.zero_length_dcid() {
p.active_dcid_seq = Some(0);
continue;
}
let dcid_seq = match self.ids.lowest_available_dcid_seq() {
Some(seq) => seq,
None => break,
};
self.ids.link_dcid_to_path_id(dcid_seq, pid)?;
p.active_dcid_seq = Some(dcid_seq);
}
if self.pkt_num_spaces[epoch].recv_pkt_need_ack.last() < Some(pn) {
self.pkt_num_spaces[epoch].largest_rx_pkt_time = now;
}
self.pkt_num_spaces[epoch].recv_pkt_num.insert(pn);
self.pkt_num_spaces[epoch].recv_pkt_need_ack.push_item(pn);
self.pkt_num_spaces[epoch].ack_elicited =
cmp::max(self.pkt_num_spaces[epoch].ack_elicited, ack_elicited);
self.pkt_num_spaces[epoch].largest_rx_pkt_num =
cmp::max(self.pkt_num_spaces[epoch].largest_rx_pkt_num, pn);
if !probing {
self.pkt_num_spaces[epoch].largest_rx_non_probing_pkt_num = cmp::max(
self.pkt_num_spaces[epoch].largest_rx_non_probing_pkt_num,
pn,
);
let active_path_id = self.paths.get_active_path_id()?;
if self.is_server &&
recv_pid != active_path_id &&
self.pkt_num_spaces[epoch].largest_rx_non_probing_pkt_num == pn
{
self.on_peer_migrated(recv_pid, self.disable_dcid_reuse, now)?;
}
}
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
self.update_tx_cap();
self.recv_count += 1;
self.paths.get_mut(recv_pid)?.recv_count += 1;
let read = b.off() + aead_tag_len;
self.recv_bytes += read as u64;
self.paths.get_mut(recv_pid)?.recv_bytes += read as u64;
if self.is_server && hdr.ty == Type::Handshake {
self.drop_epoch_state(packet::Epoch::Initial, now);
self.paths.get_mut(recv_pid)?.verified_peer_address = true;
}
self.ack_eliciting_sent = false;
Ok(read)
}
pub fn send(&mut self, out: &mut [u8]) -> Result<(usize, SendInfo)> {
self.send_on_path(out, None, None)
}
pub fn send_on_path(
&mut self, out: &mut [u8], from: Option<SocketAddr>,
to: Option<SocketAddr>,
) -> Result<(usize, SendInfo)> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
let now = Instant::now();
if self.local_error.is_none() {
self.do_handshake(now)?;
}
let _ = self.process_undecrypted_0rtt_packets();
if !self.derived_initial_secrets {
return Err(Error::Done);
}
let mut has_initial = false;
let mut done = 0;
let mut left = cmp::min(out.len(), self.max_send_udp_payload_size());
let send_pid = match (from, to) {
(Some(f), Some(t)) => self
.paths
.path_id_from_addrs(&(f, t))
.ok_or(Error::InvalidState)?,
_ => self.get_send_path_id(from, to)?,
};
let send_path = self.paths.get_mut(send_pid)?;
if let Some(pmtud) = send_path.pmtud.as_mut() {
if pmtud.should_probe() {
let size = if self.handshake_confirmed || self.handshake_completed
{
pmtud.get_probe_size()
} else {
pmtud.get_current_mtu()
};
send_path.recovery.pmtud_update_max_datagram_size(size);
left =
cmp::min(out.len(), send_path.recovery.max_datagram_size());
}
}
if !send_path.verified_peer_address && self.is_server {
left = cmp::min(left, send_path.max_send_bytes);
}
while left > 0 {
let (ty, written) = match self.send_single(
&mut out[done..done + left],
send_pid,
has_initial,
now,
) {
Ok(v) => v,
Err(Error::BufferTooShort) | Err(Error::Done) => break,
Err(e) => return Err(e),
};
done += written;
left -= written;
match ty {
Type::Initial => has_initial = true,
Type::Short => break,
_ => (),
};
if let Ok(epoch) = ty.to_epoch() {
if self.paths.get_mut(send_pid)?.recovery.loss_probes(epoch) > 0 {
break;
}
}
if !(from.is_some() && to.is_some()) &&
self.get_send_path_id(from, to)? != send_pid
{
break;
}
}
if done == 0 {
self.last_tx_data = self.tx_data;
return Err(Error::Done);
}
if has_initial && left > 0 && done < MIN_CLIENT_INITIAL_LEN {
let pad_len = cmp::min(left, MIN_CLIENT_INITIAL_LEN - done);
out[done..done + pad_len].fill(0);
done += pad_len;
}
let send_path = self.paths.get(send_pid)?;
let info = SendInfo {
from: send_path.local_addr(),
to: send_path.peer_addr(),
at: send_path.recovery.get_packet_send_time(now),
};
Ok((done, info))
}
fn send_single(
&mut self, out: &mut [u8], send_pid: usize, has_initial: bool,
now: Instant,
) -> Result<(Type, usize)> {
if out.is_empty() {
return Err(Error::BufferTooShort);
}
if self.is_draining() {
return Err(Error::Done);
}
let is_closing = self.local_error.is_some();
let out_len = out.len();
let mut b = octets::OctetsMut::with_slice(out);
let pkt_type = self.write_pkt_type(send_pid)?;
let max_dgram_len = if !self.dgram_send_queue.is_empty() {
self.dgram_max_writable_len()
} else {
None
};
let epoch = pkt_type.to_epoch()?;
let pkt_space = &mut self.pkt_num_spaces[epoch];
let crypto_ctx = &mut self.crypto_ctx[epoch];
for (_, p) in self.paths.iter_mut() {
while let Some(lost) = p.recovery.next_lost_frame(epoch) {
match lost {
frame::Frame::CryptoHeader { offset, length } => {
crypto_ctx.crypto_stream.send.retransmit(offset, length);
self.stream_retrans_bytes += length as u64;
p.stream_retrans_bytes += length as u64;
self.retrans_count += 1;
p.retrans_count += 1;
},
frame::Frame::StreamHeader {
stream_id,
offset,
length,
fin,
} => {
let stream = match self.streams.get_mut(stream_id) {
Some(v) if !v.send.is_stopped() => v,
_ => {
self.tx_buffered =
self.tx_buffered.saturating_sub(length);
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(offset),
raw: Some(RawInfo {
length: Some(length as u64),
..Default::default()
}),
from: Some(DataRecipient::Transport),
to: Some(DataRecipient::Dropped),
..Default::default()
},
);
q.add_event_data_with_instant(ev_data, now)
.ok();
});
continue;
},
};
let was_flushable = stream.is_flushable();
let empty_fin = length == 0 && fin;
stream.send.retransmit(offset, length);
if (stream.is_flushable() || empty_fin) && !was_flushable
{
let priority_key = Arc::clone(&stream.priority_key);
self.streams.insert_flushable(&priority_key);
}
self.stream_retrans_bytes += length as u64;
p.stream_retrans_bytes += length as u64;
self.retrans_count += 1;
p.retrans_count += 1;
},
frame::Frame::ACK { .. } => {
pkt_space.ack_elicited = true;
},
frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
} => {
self.streams
.insert_reset(stream_id, error_code, final_size);
},
frame::Frame::StopSending {
stream_id,
error_code,
} =>
if let Some(stream) = self.streams.get(stream_id) {
if !stream.recv.is_fin() {
self.streams
.insert_stopped(stream_id, error_code);
}
},
frame::Frame::HandshakeDone if !self.handshake_done_acked => {
self.handshake_done_sent = false;
},
frame::Frame::MaxStreamData { stream_id, .. } => {
if self.streams.get(stream_id).is_some() {
self.streams.insert_almost_full(stream_id);
}
},
frame::Frame::MaxData { .. } => {
self.should_send_max_data = true;
},
frame::Frame::MaxStreamsUni { .. } => {
self.should_send_max_streams_uni = true;
},
frame::Frame::MaxStreamsBidi { .. } => {
self.should_send_max_streams_bidi = true;
},
frame::Frame::StreamsBlockedBidi { limit } => {
self.streams_blocked_bidi_state
.force_retransmit_sent_limit_eq(limit);
},
frame::Frame::StreamsBlockedUni { limit } => {
self.streams_blocked_uni_state
.force_retransmit_sent_limit_eq(limit);
},
frame::Frame::NewConnectionId { seq_num, .. } => {
self.ids.mark_advertise_new_scid_seq(seq_num, true);
},
frame::Frame::RetireConnectionId { seq_num } => {
self.ids.mark_retire_dcid_seq(seq_num, true)?;
},
frame::Frame::Ping {
mtu_probe: Some(failed_probe),
} =>
if let Some(pmtud) = p.pmtud.as_mut() {
trace!("pmtud probe dropped: {failed_probe}");
pmtud.failed_probe(failed_probe);
},
_ => (),
}
}
}
self.check_tx_buffered_invariant();
let is_app_limited = self.delivery_rate_check_if_app_limited();
let n_paths = self.paths.len();
let path = self.paths.get_mut(send_pid)?;
let flow_control = &mut self.flow_control;
let pkt_space = &mut self.pkt_num_spaces[epoch];
let crypto_ctx = &mut self.crypto_ctx[epoch];
let pkt_num_manager = &mut self.pkt_num_manager;
let mut left = if let Some(pmtud) = path.pmtud.as_mut() {
cmp::min(pmtud.get_current_mtu(), b.cap())
} else {
b.cap()
};
if pkt_num_manager.should_skip_pn(self.handshake_completed) {
pkt_num_manager.set_skip_pn(Some(self.next_pkt_num));
self.next_pkt_num += 1;
};
let pn = self.next_pkt_num;
let largest_acked_pkt =
path.recovery.get_largest_acked_on_epoch(epoch).unwrap_or(0);
let pn_len = packet::pkt_num_len(pn, largest_acked_pkt);
let crypto_overhead = crypto_ctx.crypto_overhead().ok_or(Error::Done)?;
let dcid_seq = path.active_dcid_seq.ok_or(Error::OutOfIdentifiers)?;
let dcid =
ConnectionId::from_ref(self.ids.get_dcid(dcid_seq)?.cid.as_ref());
let scid = if let Some(scid_seq) = path.active_scid_seq {
ConnectionId::from_ref(self.ids.get_scid(scid_seq)?.cid.as_ref())
} else if pkt_type == Type::Short {
ConnectionId::default()
} else {
return Err(Error::InvalidState);
};
let hdr = Header {
ty: pkt_type,
version: self.version,
dcid,
scid,
pkt_num: 0,
pkt_num_len: pn_len,
token: if pkt_type == Type::Initial {
self.token.clone()
} else {
None
},
versions: None,
key_phase: self.key_phase,
};
hdr.to_bytes(&mut b)?;
let hdr_trace = if log::max_level() == log::LevelFilter::Trace {
Some(format!("{hdr:?}"))
} else {
None
};
let hdr_ty = hdr.ty;
#[cfg(feature = "qlog")]
let qlog_pkt_hdr = self.qlog.streamer.as_ref().map(|_q| {
qlog::events::quic::PacketHeader::with_type(
hdr.ty.to_qlog(),
Some(pn),
Some(hdr.version),
Some(&hdr.scid),
Some(&hdr.dcid),
)
});
let mut overhead = b.off() + pn_len + crypto_overhead;
if pkt_type != Type::Short {
overhead += PAYLOAD_LENGTH_LEN;
}
match left.checked_sub(overhead) {
Some(v) => left = v,
None => {
path.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
if left < PAYLOAD_MIN_LEN {
path.recovery.update_app_limited(false);
return Err(Error::Done);
}
let mut frames: SmallVec<[frame::Frame; 1]> = SmallVec::new();
let mut ack_eliciting = false;
let mut in_flight = false;
let mut is_pmtud_probe = false;
let mut has_data = false;
let ack_elicit_required = path.recovery.should_elicit_ack(epoch);
let header_offset = b.off();
if pkt_type != Type::Short {
b.skip(PAYLOAD_LENGTH_LEN)?;
}
packet::encode_pkt_num(pn, pn_len, &mut b)?;
let payload_offset = b.off();
let cwnd_available =
path.recovery.cwnd_available().saturating_sub(overhead);
let left_before_packing_ack_frame = left;
if pkt_space.recv_pkt_need_ack.len() > 0 &&
(pkt_space.ack_elicited || ack_elicit_required) &&
(!is_closing ||
(pkt_type == Type::Handshake &&
self.local_error
.as_ref()
.is_some_and(|le| le.is_app))) &&
path.active()
{
#[cfg(not(feature = "fuzzing"))]
let ack_delay = pkt_space.largest_rx_pkt_time.elapsed();
#[cfg(not(feature = "fuzzing"))]
let ack_delay = ack_delay.as_micros() as u64 /
2_u64
.pow(self.local_transport_params.ack_delay_exponent as u32);
#[cfg(feature = "fuzzing")]
let ack_delay = rand::rand_u8() as u64 + 1;
let frame = frame::Frame::ACK {
ack_delay,
ranges: pkt_space.recv_pkt_need_ack.clone(),
ecn_counts: None, };
if pkt_space.ack_elicited || frame.wire_len() < cwnd_available {
if push_frame_to_pkt!(b, frames, frame, left) {
pkt_space.ack_elicited = false;
}
}
}
left = cmp::min(
left,
cwnd_available.saturating_sub(left_before_packing_ack_frame - left),
);
let mut challenge_data = None;
if pkt_type == Type::Short {
if let Ok(active_path) = self.paths.get_active_mut() {
let should_probe_pmtu = active_path.should_send_pmtu_probe(
self.handshake_confirmed,
self.handshake_completed,
out_len,
is_closing,
frames.is_empty(),
);
if should_probe_pmtu {
if let Some(pmtud) = active_path.pmtud.as_mut() {
let probe_size = pmtud.get_probe_size();
trace!(
"{} sending pmtud probe pmtu_probe={} estimated_pmtu={}",
self.trace_id,
probe_size,
pmtud.get_current_mtu(),
);
left = probe_size;
match left.checked_sub(overhead) {
Some(v) => left = v,
None => {
active_path.recovery.update_app_limited(false);
return Err(Error::Done);
},
}
let frame = frame::Frame::Padding {
len: probe_size - overhead - 1,
};
if push_frame_to_pkt!(b, frames, frame, left) {
let frame = frame::Frame::Ping {
mtu_probe: Some(probe_size),
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
}
pmtud.set_in_flight(true);
is_pmtud_probe = true;
}
}
}
let path = self.paths.get_mut(send_pid)?;
while let Some(challenge) = path.pop_received_challenge() {
let frame = frame::Frame::PathResponse { data: challenge };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
} else {
break;
}
}
if path.validation_requested() {
let data = rand::rand_u64().to_be_bytes();
let frame = frame::Frame::PathChallenge { data };
if push_frame_to_pkt!(b, frames, frame, left) {
challenge_data = Some(data);
ack_eliciting = true;
in_flight = true;
}
}
if let Some(key_update) = crypto_ctx.key_update.as_mut() {
key_update.update_acked = true;
}
}
let path = self.paths.get_mut(send_pid)?;
if pkt_type == Type::Short && !is_closing {
while let Some(seq_num) = self.ids.next_advertise_new_scid_seq() {
let frame = self.ids.get_new_connection_id_frame_for(seq_num)?;
if push_frame_to_pkt!(b, frames, frame, left) {
self.ids.mark_advertise_new_scid_seq(seq_num, false);
ack_eliciting = true;
in_flight = true;
} else {
break;
}
}
}
if pkt_type == Type::Short && !is_closing && path.active() {
if self.handshake_completed &&
!self.handshake_done_sent &&
self.is_server
{
let frame = frame::Frame::HandshakeDone;
if push_frame_to_pkt!(b, frames, frame, left) {
self.handshake_done_sent = true;
ack_eliciting = true;
in_flight = true;
}
}
if self.streams.should_update_max_streams_bidi() ||
self.should_send_max_streams_bidi
{
let frame = frame::Frame::MaxStreamsBidi {
max: self.streams.max_streams_bidi_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_bidi();
self.should_send_max_streams_bidi = false;
ack_eliciting = true;
in_flight = true;
}
}
if self.streams.should_update_max_streams_uni() ||
self.should_send_max_streams_uni
{
let frame = frame::Frame::MaxStreamsUni {
max: self.streams.max_streams_uni_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.update_max_streams_uni();
self.should_send_max_streams_uni = false;
ack_eliciting = true;
in_flight = true;
}
}
if let Some(limit) = self.blocked_limit {
let frame = frame::Frame::DataBlocked { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.blocked_limit = None;
self.data_blocked_sent_count =
self.data_blocked_sent_count.saturating_add(1);
ack_eliciting = true;
in_flight = true;
}
}
if self
.streams_blocked_bidi_state
.has_pending_stream_blocked_frame()
{
if let Some(limit) = self.streams_blocked_bidi_state.blocked_at {
let frame = frame::Frame::StreamsBlockedBidi { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams_blocked_bidi_state.blocked_sent =
Some(limit);
ack_eliciting = true;
in_flight = true;
}
}
}
if self
.streams_blocked_uni_state
.has_pending_stream_blocked_frame()
{
if let Some(limit) = self.streams_blocked_uni_state.blocked_at {
let frame = frame::Frame::StreamsBlockedUni { limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams_blocked_uni_state.blocked_sent = Some(limit);
ack_eliciting = true;
in_flight = true;
}
}
}
for stream_id in self.streams.almost_full() {
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => {
self.streams.remove_almost_full(stream_id);
continue;
},
};
if stream.recv.almost_full() {
stream.recv.autotune_window(now, path.recovery.rtt());
}
let frame = frame::Frame::MaxStreamData {
stream_id,
max: stream.recv.max_data_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
let recv_win = stream.recv.window();
stream.recv.update_max_data(now);
self.streams.remove_almost_full(stream_id);
ack_eliciting = true;
in_flight = true;
flow_control.ensure_window_lower_bound(
(recv_win as f64 * CONNECTION_WINDOW_FACTOR) as u64,
);
}
}
if flow_control.should_update_max_data() &&
flow_control.max_data() < flow_control.max_data_next()
{
flow_control.autotune_window(now, path.recovery.rtt());
self.should_send_max_data = true;
}
if self.should_send_max_data {
let frame = frame::Frame::MaxData {
max: flow_control.max_data_next(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.should_send_max_data = false;
flow_control.update_max_data(now);
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, error_code) in self
.streams
.stopped()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StopSending {
stream_id,
error_code,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.remove_stopped(stream_id);
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, (error_code, final_size)) in self
.streams
.reset()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, (u64, u64))>>()
{
let frame = frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
};
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.remove_reset(stream_id);
ack_eliciting = true;
in_flight = true;
}
}
for (stream_id, limit) in self
.streams
.blocked()
.map(|(&k, &v)| (k, v))
.collect::<Vec<(u64, u64)>>()
{
let frame = frame::Frame::StreamDataBlocked { stream_id, limit };
if push_frame_to_pkt!(b, frames, frame, left) {
self.streams.remove_blocked(stream_id);
self.stream_data_blocked_sent_count =
self.stream_data_blocked_sent_count.saturating_add(1);
ack_eliciting = true;
in_flight = true;
}
}
let retire_dcid_seqs = self.ids.retire_dcid_seqs();
for seq_num in retire_dcid_seqs {
let dcid_seq = path.active_dcid_seq.ok_or(Error::InvalidState)?;
if seq_num == dcid_seq {
continue;
}
let frame = frame::Frame::RetireConnectionId { seq_num };
if push_frame_to_pkt!(b, frames, frame, left) {
self.ids.mark_retire_dcid_seq(seq_num, false)?;
ack_eliciting = true;
in_flight = true;
} else {
break;
}
}
}
if path.active() || n_paths == 1 {
if let Some(conn_err) = self.local_error.as_ref() {
if conn_err.is_app {
if pkt_type == Type::Short {
let frame = frame::Frame::ApplicationClose {
error_code: conn_err.error_code,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
let pto = path.recovery.pto();
self.draining_timer = Some(now + (pto * 3));
ack_eliciting = true;
in_flight = true;
}
}
} else {
let frame = frame::Frame::ConnectionClose {
error_code: conn_err.error_code,
frame_type: 0,
reason: conn_err.reason.clone(),
};
if push_frame_to_pkt!(b, frames, frame, left) {
let pto = path.recovery.pto();
self.draining_timer = Some(now + (pto * 3));
ack_eliciting = true;
in_flight = true;
}
}
}
}
if crypto_ctx.crypto_stream.is_flushable() &&
left > frame::MAX_CRYPTO_OVERHEAD &&
!is_closing &&
path.active()
{
let crypto_off = crypto_ctx.crypto_stream.send.off_front();
let hdr_off = b.off();
let hdr_len = 1 + octets::varint_len(crypto_off) + 2;
if let Some(max_len) = left.checked_sub(hdr_len) {
let (mut crypto_hdr, mut crypto_payload) =
b.split_at(hdr_off + hdr_len)?;
let (len, _) = crypto_ctx
.crypto_stream
.send
.emit(&mut crypto_payload.as_mut()[..max_len])?;
crypto_hdr.skip(hdr_off)?;
frame::encode_crypto_header(
crypto_off,
len as u64,
&mut crypto_hdr,
)?;
b.skip(hdr_len + len)?;
let frame = frame::Frame::CryptoHeader {
offset: crypto_off,
length: len,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
}
}
let mut dgram_emitted = false;
let dgrams_to_emit = max_dgram_len.is_some();
let stream_to_emit = self.streams.has_flushable();
let mut do_dgram = self.emit_dgram && dgrams_to_emit;
let do_stream = !self.emit_dgram && stream_to_emit;
if !do_stream && dgrams_to_emit {
do_dgram = true;
}
if (pkt_type == Type::Short || pkt_type == Type::ZeroRTT) &&
left > frame::MAX_DGRAM_OVERHEAD &&
!is_closing &&
path.active() &&
do_dgram
{
if let Some(max_dgram_payload) = max_dgram_len {
while let Some(len) = self.dgram_send_queue.peek_front_len() {
let hdr_off = b.off();
let hdr_len = 1 + 2;
if (hdr_len + len) <= left {
match self.dgram_send_queue.pop() {
Some(data) => {
let (mut dgram_hdr, mut dgram_payload) =
b.split_at(hdr_off + hdr_len)?;
dgram_payload.as_mut()[..len]
.copy_from_slice(data.as_ref());
dgram_hdr.skip(hdr_off)?;
frame::encode_dgram_header(
len as u64,
&mut dgram_hdr,
)?;
b.skip(hdr_len + len)?;
let frame =
frame::Frame::DatagramHeader { length: len };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
dgram_emitted = true;
self.dgram_sent_count =
self.dgram_sent_count.saturating_add(1);
path.dgram_sent_count =
path.dgram_sent_count.saturating_add(1);
}
},
None => continue,
};
} else if len > max_dgram_payload {
self.dgram_send_queue.pop();
} else {
break;
}
}
}
}
if (pkt_type == Type::Short || pkt_type == Type::ZeroRTT) &&
left > frame::MAX_STREAM_OVERHEAD &&
!is_closing &&
path.active() &&
!dgram_emitted
{
while let Some(priority_key) = self.streams.peek_flushable() {
let stream_id = priority_key.id;
let stream = match self.streams.get_mut(stream_id) {
Some(v) if !v.send.is_stopped() => v,
_ => {
self.streams.remove_flushable(&priority_key);
continue;
},
};
let stream_off = stream.send.off_front();
let hdr_off = b.off();
let hdr_len = 1 + octets::varint_len(stream_id) + octets::varint_len(stream_off) + 2;
let max_len = match left.checked_sub(hdr_len) {
Some(v) => v,
None => {
let priority_key = Arc::clone(&stream.priority_key);
self.streams.remove_flushable(&priority_key);
continue;
},
};
let (mut stream_hdr, mut stream_payload) =
b.split_at(hdr_off + hdr_len)?;
let (len, fin) =
stream.send.emit(&mut stream_payload.as_mut()[..max_len])?;
stream_hdr.skip(hdr_off)?;
frame::encode_stream_header(
stream_id,
stream_off,
len as u64,
fin,
&mut stream_hdr,
)?;
b.skip(hdr_len + len)?;
let frame = frame::Frame::StreamHeader {
stream_id,
offset: stream_off,
length: len,
fin,
};
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
has_data = true;
}
let priority_key = Arc::clone(&stream.priority_key);
if !stream.is_flushable() {
self.streams.remove_flushable(&priority_key);
} else if stream.incremental {
self.streams.remove_flushable(&priority_key);
self.streams.insert_flushable(&priority_key);
}
#[cfg(feature = "fuzzing")]
if left > frame::MAX_STREAM_OVERHEAD {
continue;
}
break;
}
}
self.emit_dgram = !dgram_emitted;
if (ack_elicit_required || path.needs_ack_eliciting) &&
!ack_eliciting &&
left >= 1 &&
!is_closing
{
let frame = frame::Frame::Ping { mtu_probe: None };
if push_frame_to_pkt!(b, frames, frame, left) {
ack_eliciting = true;
in_flight = true;
}
}
if ack_eliciting && !is_pmtud_probe {
path.needs_ack_eliciting = false;
path.recovery.ping_sent(epoch);
}
if !has_data &&
!dgram_emitted &&
cwnd_available > frame::MAX_STREAM_OVERHEAD
{
path.recovery.on_app_limited();
}
if frames.is_empty() {
path.recovery.update_app_limited(false);
return Err(Error::Done);
}
if (has_initial || !path.validated()) &&
pkt_type == Type::Short &&
left >= 1
{
let frame = frame::Frame::Padding { len: left };
if push_frame_to_pkt!(b, frames, frame, left) {
in_flight = true;
}
}
if b.off() - payload_offset < PAYLOAD_MIN_LEN {
let payload_len = b.off() - payload_offset;
let frame = frame::Frame::Padding {
len: PAYLOAD_MIN_LEN - payload_len,
};
#[allow(unused_assignments)]
if push_frame_to_pkt!(b, frames, frame, left) {
in_flight = true;
}
}
let payload_len = b.off() - payload_offset;
if pkt_type != Type::Short {
let len = pn_len + payload_len + crypto_overhead;
let (_, mut payload_with_len) = b.split_at(header_offset)?;
payload_with_len
.put_varint_with_len(len as u64, PAYLOAD_LENGTH_LEN)?;
}
trace!(
"{} tx pkt {} len={} pn={} {}",
self.trace_id,
hdr_trace.unwrap_or_default(),
payload_len,
pn,
AddrTupleFmt(path.local_addr(), path.peer_addr())
);
#[cfg(feature = "qlog")]
let mut qlog_frames: Vec<qlog::events::quic::QuicFrame> =
Vec::with_capacity(frames.len());
for frame in &mut frames {
trace!("{} tx frm {:?}", self.trace_id, frame);
qlog_with_type!(QLOG_PACKET_TX, self.qlog, _q, {
qlog_frames.push(frame.to_qlog());
});
}
qlog_with_type!(QLOG_PACKET_TX, self.qlog, q, {
if let Some(header) = qlog_pkt_hdr {
let length = payload_len + payload_offset + crypto_overhead;
let qlog_raw_info = RawInfo {
length: Some(length as u64),
payload_length: Some(payload_len as u64),
data: None,
};
let send_at_time =
now.duration_since(q.start_time()).as_secs_f64() * 1000.0;
let ev_data =
EventData::QuicPacketSent(qlog::events::quic::PacketSent {
header,
frames: Some(qlog_frames),
raw: Some(qlog_raw_info),
send_at_time: Some(send_at_time),
..Default::default()
});
q.add_event_data_with_instant(ev_data, now).ok();
}
});
let aead = match crypto_ctx.crypto_seal {
Some(ref mut v) => v,
None => return Err(Error::InvalidState),
};
let written = packet::encrypt_pkt(
&mut b,
pn,
pn_len,
payload_len,
payload_offset,
None,
aead,
)?;
let sent_pkt_has_data = if path.recovery.gcongestion_enabled() {
has_data || dgram_emitted
} else {
has_data
};
let sent_pkt = recovery::Sent {
pkt_num: pn,
frames,
time_sent: now,
time_acked: None,
time_lost: None,
size: if ack_eliciting { written } else { 0 },
ack_eliciting,
in_flight,
delivered: 0,
delivered_time: now,
first_sent_time: now,
is_app_limited: false,
tx_in_flight: 0,
lost: 0,
has_data: sent_pkt_has_data,
is_pmtud_probe,
};
if in_flight && is_app_limited {
path.recovery.delivery_rate_update_app_limited(true);
}
self.next_pkt_num += 1;
let handshake_status = recovery::HandshakeStatus {
has_handshake_keys: self.crypto_ctx[packet::Epoch::Handshake]
.has_keys(),
peer_verified_address: self.peer_verified_initial_address,
completed: self.handshake_completed,
};
self.on_packet_sent(send_pid, sent_pkt, epoch, handshake_status, now)?;
let path = self.paths.get_mut(send_pid)?;
qlog_with_type!(QLOG_METRICS, self.qlog, q, {
path.recovery.maybe_qlog(q, now);
});
if let Some(data) = challenge_data {
path.add_challenge_sent(data, written, now);
}
self.sent_count += 1;
self.sent_bytes += written as u64;
path.sent_count += 1;
path.sent_bytes += written as u64;
if self.dgram_send_queue.byte_size() > path.recovery.cwnd_available() {
path.recovery.update_app_limited(false);
}
path.max_send_bytes = path.max_send_bytes.saturating_sub(written);
if !self.is_server && hdr_ty == Type::Handshake {
self.drop_epoch_state(packet::Epoch::Initial, now);
}
if ack_eliciting && !self.ack_eliciting_sent {
if let Some(idle_timeout) = self.idle_timeout() {
self.idle_timer = Some(now + idle_timeout);
}
}
if ack_eliciting {
self.ack_eliciting_sent = true;
}
Ok((pkt_type, written))
}
fn on_packet_sent(
&mut self, send_pid: usize, sent_pkt: recovery::Sent,
epoch: packet::Epoch, handshake_status: recovery::HandshakeStatus,
now: Instant,
) -> Result<()> {
let path = self.paths.get_mut(send_pid)?;
let cwnd = path.recovery.cwnd();
let max_datagram_size = path.recovery.max_datagram_size();
self.pkt_num_spaces[epoch].on_packet_sent(&sent_pkt);
self.pkt_num_manager.on_packet_sent(
cwnd,
max_datagram_size,
self.handshake_completed,
);
path.recovery.on_packet_sent(
sent_pkt,
epoch,
handshake_status,
now,
&self.trace_id,
);
Ok(())
}
#[inline]
pub fn get_next_release_time(&self) -> Option<ReleaseDecision> {
Some(
self.paths
.get_active()
.ok()?
.recovery
.get_next_release_time(),
)
}
#[inline]
pub fn gcongestion_enabled(&self) -> Option<bool> {
Some(self.paths.get_active().ok()?.recovery.gcongestion_enabled())
}
pub fn max_release_into_future(&self) -> Duration {
self.paths
.get_active()
.map(|p| p.recovery.rtt().mul_f64(0.125))
.unwrap_or(Duration::from_millis(1))
.max(Duration::from_millis(1))
.min(Duration::from_millis(5))
}
#[inline]
pub fn pacing_enabled(&self) -> bool {
self.recovery_config.pacing
}
#[inline]
pub fn send_quantum(&self) -> usize {
match self.paths.get_active() {
Ok(p) => p.recovery.send_quantum(),
_ => 0,
}
}
pub fn send_quantum_on_path(
&self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> usize {
self.paths
.path_id_from_addrs(&(local_addr, peer_addr))
.and_then(|pid| self.paths.get(pid).ok())
.map(|path| path.recovery.send_quantum())
.unwrap_or(0)
}
#[inline]
pub fn stream_recv(
&mut self, stream_id: u64, out: &mut [u8],
) -> Result<(usize, bool)> {
self.stream_recv_buf(stream_id, out)
}
pub fn stream_recv_buf<B: bytes::BufMut>(
&mut self, stream_id: u64, out: B,
) -> Result<(usize, bool)> {
self.do_stream_recv(stream_id, RecvAction::Emit { out })
}
pub fn stream_discard(
&mut self, stream_id: u64, len: usize,
) -> Result<(usize, bool)> {
self.do_stream_recv::<&mut [u8]>(stream_id, RecvAction::Discard { len })
}
fn do_stream_recv<B: bytes::BufMut>(
&mut self, stream_id: u64, action: RecvAction<B>,
) -> Result<(usize, bool)> {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState(stream_id));
}
let stream = self
.streams
.get_mut(stream_id)
.ok_or(Error::InvalidStreamState(stream_id))?;
if !stream.is_readable() {
return Err(Error::Done);
}
let local = stream.local;
let priority_key = Arc::clone(&stream.priority_key);
#[cfg(feature = "qlog")]
let offset = stream.recv.off_front();
#[cfg(feature = "qlog")]
let to = match action {
RecvAction::Emit { .. } => Some(DataRecipient::Application),
RecvAction::Discard { .. } => Some(DataRecipient::Dropped),
};
let (read, fin) = match stream.recv.emit_or_discard(action) {
Ok(v) => v,
Err(e) => {
if stream.is_complete() {
self.streams.collect(stream_id, local);
}
self.streams.remove_readable(&priority_key);
return Err(e);
},
};
self.flow_control.add_consumed(read as u64);
let readable = stream.is_readable();
let complete = stream.is_complete();
if stream.recv.almost_full() {
self.streams.insert_almost_full(stream_id);
}
if !readable {
self.streams.remove_readable(&priority_key);
}
if complete {
self.streams.collect(stream_id, local);
}
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(offset),
raw: Some(RawInfo {
length: Some(read as u64),
..Default::default()
}),
from: Some(DataRecipient::Transport),
to,
additional_info: fin
.then_some(DataMovedAdditionalInfo::FinSet),
},
);
let now = Instant::now();
q.add_event_data_with_instant(ev_data, now).ok();
});
if priority_key.incremental && readable {
self.streams.remove_readable(&priority_key);
self.streams.insert_readable(&priority_key);
}
Ok((read, fin))
}
pub fn stream_send(
&mut self, stream_id: u64, buf: &[u8], fin: bool,
) -> Result<usize> {
self.stream_do_send(
stream_id,
buf,
fin,
|stream: &mut stream::Stream<F>,
buf: &[u8],
cap: usize,
fin: bool| {
stream.send.write(&buf[..cap], fin).map(|v| (v, v))
},
)
}
pub fn stream_send_zc(
&mut self, stream_id: u64, buf: F::Buf, len: Option<usize>, fin: bool,
) -> Result<(usize, Option<F::Buf>)>
where
F::Buf: BufSplit,
{
self.stream_do_send(
stream_id,
buf,
fin,
|stream: &mut stream::Stream<F>,
buf: F::Buf,
cap: usize,
fin: bool| {
let len = len.unwrap_or(usize::MAX).min(cap);
let (sent, remaining) = stream.send.append_buf(buf, len, fin)?;
Ok((sent, (sent, remaining)))
},
)
}
fn stream_do_send<B, R, SND>(
&mut self, stream_id: u64, buf: B, fin: bool, write_fn: SND,
) -> Result<R>
where
B: AsRef<[u8]>,
SND: FnOnce(&mut stream::Stream<F>, B, usize, bool) -> Result<(usize, R)>,
{
if !stream::is_bidi(stream_id) &&
!stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState(stream_id));
}
let len = buf.as_ref().len();
if self.max_tx_data - self.tx_data < len as u64 {
self.blocked_limit = Some(self.max_tx_data);
}
let cap = self.tx_cap;
let stream = match self.get_or_create_stream(stream_id, true) {
Ok(v) => v,
Err(Error::StreamLimit) => {
if self.enable_send_streams_blocked &&
stream::is_local(stream_id, self.is_server)
{
if stream::is_bidi(stream_id) {
let limit = self.streams.peer_max_streams_bidi();
self.streams_blocked_bidi_state.update_at(limit);
} else {
let limit = self.streams.peer_max_streams_uni();
self.streams_blocked_uni_state.update_at(limit);
}
}
return Err(Error::StreamLimit);
},
Err(e) => return Err(e),
};
#[cfg(feature = "qlog")]
let offset = stream.send.off_back();
let was_writable = stream.is_writable();
let was_flushable = stream.is_flushable();
let is_complete = stream.is_complete();
let is_readable = stream.is_readable();
let priority_key = Arc::clone(&stream.priority_key);
if let Err(Error::StreamStopped(e)) = stream.send.cap() {
if is_complete && !is_readable {
let local = stream.local;
self.streams.collect(stream_id, local);
}
return Err(Error::StreamStopped(e));
};
if cap == 0 && len > 0 {
if was_writable {
self.streams.insert_writable(&priority_key);
}
return Err(Error::Done);
}
let (cap, fin, blocked_by_cap) = if cap < len {
(cap, false, true)
} else {
(len, fin, false)
};
let (sent, ret) = match write_fn(stream, buf, cap, fin) {
Ok(v) => v,
Err(e) => {
self.streams.remove_writable(&priority_key);
return Err(e);
},
};
let incremental = stream.incremental;
let priority_key = Arc::clone(&stream.priority_key);
let flushable = stream.is_flushable();
let writable = stream.is_writable();
let empty_fin = len == 0 && fin;
if sent < cap {
let max_off = stream.send.max_off();
if stream.send.blocked_at() != Some(max_off) {
stream.send.update_blocked_at(Some(max_off));
self.streams.insert_blocked(stream_id, max_off);
}
} else {
stream.send.update_blocked_at(None);
self.streams.remove_blocked(stream_id);
}
if (flushable || empty_fin) && !was_flushable {
self.streams.insert_flushable(&priority_key);
}
if !writable {
self.streams.remove_writable(&priority_key);
} else if was_writable && blocked_by_cap {
self.streams.insert_writable(&priority_key);
}
self.tx_cap -= sent;
self.tx_data += sent as u64;
self.tx_buffered += sent;
self.check_tx_buffered_invariant();
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(offset),
raw: Some(RawInfo {
length: Some(sent as u64),
..Default::default()
}),
from: Some(DataRecipient::Application),
to: Some(DataRecipient::Transport),
additional_info: fin
.then_some(DataMovedAdditionalInfo::FinSet),
},
);
let now = Instant::now();
q.add_event_data_with_instant(ev_data, now).ok();
});
if sent == 0 && cap > 0 {
return Err(Error::Done);
}
if incremental && writable {
self.streams.remove_writable(&priority_key);
self.streams.insert_writable(&priority_key);
}
Ok(ret)
}
pub fn stream_priority(
&mut self, stream_id: u64, urgency: u8, incremental: bool,
) -> Result<()> {
let stream = match self.get_or_create_stream(stream_id, true) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
if stream.urgency == urgency && stream.incremental == incremental {
return Ok(());
}
stream.urgency = urgency;
stream.incremental = incremental;
let new_priority_key = Arc::new(StreamPriorityKey {
urgency: stream.urgency,
incremental: stream.incremental,
id: stream_id,
..Default::default()
});
let old_priority_key =
std::mem::replace(&mut stream.priority_key, new_priority_key.clone());
self.streams
.update_priority(&old_priority_key, &new_priority_key);
Ok(())
}
pub fn stream_shutdown(
&mut self, stream_id: u64, direction: Shutdown, err: u64,
) -> Result<()> {
if direction == Shutdown::Read &&
stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
return Err(Error::InvalidStreamState(stream_id));
}
if direction == Shutdown::Write &&
!stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
return Err(Error::InvalidStreamState(stream_id));
}
let stream = self.streams.get_mut(stream_id).ok_or(Error::Done)?;
let priority_key = Arc::clone(&stream.priority_key);
match direction {
Shutdown::Read => {
let consumed = stream.recv.shutdown()?;
self.flow_control.add_consumed(consumed);
if !stream.recv.is_fin() {
self.streams.insert_stopped(stream_id, err);
}
self.streams.remove_readable(&priority_key);
self.stopped_stream_local_count =
self.stopped_stream_local_count.saturating_add(1);
},
Shutdown::Write => {
let (final_size, unsent) = stream.send.shutdown()?;
self.tx_data = self.tx_data.saturating_sub(unsent);
self.tx_buffered =
self.tx_buffered.saturating_sub(unsent as usize);
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(final_size),
raw: Some(RawInfo {
length: Some(unsent),
..Default::default()
}),
from: Some(DataRecipient::Transport),
to: Some(DataRecipient::Dropped),
..Default::default()
},
);
q.add_event_data_with_instant(ev_data, Instant::now()).ok();
});
self.update_tx_cap();
self.streams.insert_reset(stream_id, err, final_size);
self.streams.remove_writable(&priority_key);
self.reset_stream_local_count =
self.reset_stream_local_count.saturating_add(1);
},
}
Ok(())
}
#[inline]
pub fn stream_capacity(&mut self, stream_id: u64) -> Result<usize> {
if let Some(stream) = self.streams.get(stream_id) {
let stream_cap = match stream.send.cap() {
Ok(v) => v,
Err(Error::StreamStopped(e)) => {
if stream.is_complete() && !stream.is_readable() {
let local = stream.local;
self.streams.collect(stream_id, local);
}
return Err(Error::StreamStopped(e));
},
Err(e) => return Err(e),
};
let cap = cmp::min(self.tx_cap, stream_cap);
return Ok(cap);
};
Err(Error::InvalidStreamState(stream_id))
}
pub fn stream_readable_next(&mut self) -> Option<u64> {
let priority_key = self.streams.readable.front().clone_pointer()?;
self.streams.remove_readable(&priority_key);
Some(priority_key.id)
}
pub fn stream_readable(&self, stream_id: u64) -> bool {
let stream = match self.streams.get(stream_id) {
Some(v) => v,
None => return false,
};
stream.is_readable()
}
pub fn stream_writable_next(&mut self) -> Option<u64> {
if self.tx_cap == 0 {
return None;
}
let mut cursor = self.streams.writable.front();
while let Some(priority_key) = cursor.clone_pointer() {
if let Some(stream) = self.streams.get(priority_key.id) {
let cap = match stream.send.cap() {
Ok(v) => v,
Err(_) =>
return {
self.streams.remove_writable(&priority_key);
Some(priority_key.id)
},
};
if cmp::min(self.tx_cap, cap) >= stream.send_lowat {
self.streams.remove_writable(&priority_key);
return Some(priority_key.id);
}
}
cursor.move_next();
}
None
}
#[inline]
pub fn stream_writable(
&mut self, stream_id: u64, len: usize,
) -> Result<bool> {
if self.stream_capacity(stream_id)? >= len {
return Ok(true);
}
let stream = match self.streams.get_mut(stream_id) {
Some(v) => v,
None => return Err(Error::InvalidStreamState(stream_id)),
};
stream.send_lowat = cmp::max(1, len);
let is_writable = stream.is_writable();
let priority_key = Arc::clone(&stream.priority_key);
if self.max_tx_data - self.tx_data < len as u64 {
self.blocked_limit = Some(self.max_tx_data);
}
if stream.send.cap()? < len {
let max_off = stream.send.max_off();
if stream.send.blocked_at() != Some(max_off) {
stream.send.update_blocked_at(Some(max_off));
self.streams.insert_blocked(stream_id, max_off);
}
} else if is_writable {
self.streams.insert_writable(&priority_key);
}
Ok(false)
}
#[inline]
pub fn stream_finished(&self, stream_id: u64) -> bool {
let stream = match self.streams.get(stream_id) {
Some(v) => v,
None => return true,
};
stream.recv.is_fin()
}
#[inline]
pub fn peer_streams_left_bidi(&self) -> u64 {
self.streams.peer_streams_left_bidi()
}
#[inline]
pub fn peer_streams_left_uni(&self) -> u64 {
self.streams.peer_streams_left_uni()
}
#[inline]
pub fn readable(&self) -> StreamIter {
self.streams.readable()
}
#[inline]
pub fn writable(&self) -> StreamIter {
if self.tx_cap == 0 {
return StreamIter::default();
}
self.streams.writable()
}
pub fn max_send_udp_payload_size(&self) -> usize {
let max_datagram_size = self
.paths
.get_active()
.ok()
.map(|p| p.recovery.max_datagram_size());
if let Some(max_datagram_size) = max_datagram_size {
if self.is_established() {
return cmp::min(16383, max_datagram_size);
}
}
MIN_CLIENT_INITIAL_LEN
}
pub fn send_ack_eliciting(&mut self) -> Result<()> {
if self.is_closed() || self.is_draining() {
return Ok(());
}
self.paths.get_active_mut()?.needs_ack_eliciting = true;
Ok(())
}
pub fn send_ack_eliciting_on_path(
&mut self, local: SocketAddr, peer: SocketAddr,
) -> Result<()> {
if self.is_closed() || self.is_draining() {
return Ok(());
}
let path_id = self
.paths
.path_id_from_addrs(&(local, peer))
.ok_or(Error::InvalidState)?;
self.paths.get_mut(path_id)?.needs_ack_eliciting = true;
Ok(())
}
#[inline]
pub fn dgram_recv(&mut self, buf: &mut [u8]) -> Result<usize> {
match self.dgram_recv_queue.pop() {
Some(d) => {
if d.as_ref().len() > buf.len() {
return Err(Error::BufferTooShort);
}
let len = d.as_ref().len();
buf[..len].copy_from_slice(d.as_ref());
Ok(len)
},
None => Err(Error::Done),
}
}
#[inline]
pub fn dgram_recv_buf(&mut self) -> Result<F::DgramBuf> {
self.dgram_recv_queue.pop().ok_or(Error::Done)
}
#[inline]
pub fn dgram_recv_peek(&self, buf: &mut [u8], len: usize) -> Result<usize> {
self.dgram_recv_queue.peek_front_bytes(buf, len)
}
#[inline]
pub fn dgram_recv_front_len(&self) -> Option<usize> {
self.dgram_recv_queue.peek_front_len()
}
#[inline]
pub fn dgram_recv_queue_len(&self) -> usize {
self.dgram_recv_queue.len()
}
#[inline]
pub fn dgram_recv_queue_byte_size(&self) -> usize {
self.dgram_recv_queue.byte_size()
}
#[inline]
pub fn dgram_send_queue_len(&self) -> usize {
self.dgram_send_queue.len()
}
#[inline]
pub fn dgram_send_queue_byte_size(&self) -> usize {
self.dgram_send_queue.byte_size()
}
#[inline]
pub fn is_dgram_send_queue_full(&self) -> bool {
self.dgram_send_queue.is_full()
}
#[inline]
pub fn is_dgram_recv_queue_full(&self) -> bool {
self.dgram_recv_queue.is_full()
}
pub fn dgram_send(&mut self, buf: &[u8]) -> Result<()> {
self.dgram_send_buf(F::dgram_buf_from_slice(buf))
}
pub fn dgram_send_buf(&mut self, buf: F::DgramBuf) -> Result<()> {
let max_payload_len = match self.dgram_max_writable_len() {
Some(v) => v,
None => return Err(Error::InvalidState),
};
if buf.as_ref().len() > max_payload_len {
return Err(Error::BufferTooShort);
}
self.dgram_send_queue.push(buf)?;
let active_path = self.paths.get_active_mut()?;
if self.dgram_send_queue.byte_size() >
active_path.recovery.cwnd_available()
{
active_path.recovery.update_app_limited(false);
}
Ok(())
}
#[inline]
pub fn dgram_purge_outgoing<FN: Fn(&[u8]) -> bool>(&mut self, f: FN) {
self.dgram_send_queue.purge(f);
}
#[inline]
pub fn dgram_max_writable_len(&self) -> Option<usize> {
match self.peer_transport_params.max_datagram_frame_size {
None => None,
Some(peer_frame_len) => {
let dcid = self.destination_id();
let mut max_len = self.max_send_udp_payload_size();
max_len = max_len.saturating_sub(1 + dcid.len());
max_len = max_len.saturating_sub(packet::MAX_PKT_NUM_LEN);
max_len = max_len.saturating_sub(
self.crypto_ctx[packet::Epoch::Application]
.crypto_overhead()?,
);
max_len = cmp::min(peer_frame_len as usize, max_len);
max_len.checked_sub(1 + frame::MAX_DGRAM_OVERHEAD)
},
}
}
fn dgram_enabled(&self) -> bool {
self.local_transport_params
.max_datagram_frame_size
.is_some()
}
pub fn timeout_instant(&self) -> Option<Instant> {
if self.is_closed() {
return None;
}
if self.is_draining() {
self.draining_timer
} else {
let path_timer = self
.paths
.iter()
.filter_map(|(_, p)| p.recovery.loss_detection_timer())
.min();
let key_update_timer = self.crypto_ctx[packet::Epoch::Application]
.key_update
.as_ref()
.map(|key_update| key_update.timer);
let timers = [self.idle_timer, path_timer, key_update_timer];
timers.iter().filter_map(|&x| x).min()
}
}
pub fn timeout(&self) -> Option<Duration> {
self.timeout_instant().map(|timeout| {
let now = Instant::now();
if timeout <= now {
Duration::ZERO
} else {
timeout.duration_since(now)
}
})
}
pub fn on_timeout(&mut self) {
let now = Instant::now();
if let Some(draining_timer) = self.draining_timer {
if draining_timer <= now {
trace!("{} draining timeout expired", self.trace_id);
self.mark_closed();
}
return;
}
if let Some(timer) = self.idle_timer {
if timer <= now {
trace!("{} idle timeout expired", self.trace_id);
self.mark_closed();
self.timed_out = true;
return;
}
}
if let Some(timer) = self.crypto_ctx[packet::Epoch::Application]
.key_update
.as_ref()
.map(|key_update| key_update.timer)
{
if timer <= now {
let _ = self.crypto_ctx[packet::Epoch::Application]
.key_update
.take();
}
}
let handshake_status = self.handshake_status();
for (_, p) in self.paths.iter_mut() {
if let Some(timer) = p.recovery.loss_detection_timer() {
if timer <= now {
trace!("{} loss detection timeout expired", self.trace_id);
let OnLossDetectionTimeoutOutcome {
lost_packets,
lost_bytes,
} = p.on_loss_detection_timeout(
handshake_status,
now,
self.is_server,
&self.trace_id,
);
self.lost_count += lost_packets;
self.lost_bytes += lost_bytes as u64;
qlog_with_type!(QLOG_METRICS, self.qlog, q, {
p.recovery.maybe_qlog(q, now);
});
}
}
}
self.paths.notify_failed_validations();
if self.paths.get_active_path_id().is_err() {
match self.paths.find_candidate_path() {
Some(pid) => {
if self.set_active_path(pid, now).is_err() {
self.mark_closed();
}
},
None => {
self.mark_closed();
},
}
}
}
pub fn probe_path(
&mut self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> Result<u64> {
let pid = match self.paths.path_id_from_addrs(&(local_addr, peer_addr)) {
Some(pid) => pid,
None => self.create_path_on_client(local_addr, peer_addr)?,
};
let path = self.paths.get_mut(pid)?;
path.request_validation();
path.active_dcid_seq.ok_or(Error::InvalidState)
}
pub fn migrate_source(&mut self, local_addr: SocketAddr) -> Result<u64> {
let peer_addr = self.paths.get_active()?.peer_addr();
self.migrate(local_addr, peer_addr)
}
pub fn migrate(
&mut self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> Result<u64> {
if self.is_server {
return Err(Error::InvalidState);
}
let (pid, dcid_seq) = if let Some(pid) =
self.paths.path_id_from_addrs(&(local_addr, peer_addr))
{
let path = self.paths.get_mut(pid)?;
if path.active() {
return path.active_dcid_seq.ok_or(Error::OutOfIdentifiers);
}
if !self.ids.zero_length_scid() &&
path.active_scid_seq.is_none() &&
self.ids.available_scids() == 0
{
return Err(Error::OutOfIdentifiers);
}
let dcid_seq = if let Some(dcid_seq) = path.active_dcid_seq {
dcid_seq
} else {
let dcid_seq = self
.ids
.lowest_available_dcid_seq()
.ok_or(Error::OutOfIdentifiers)?;
self.ids.link_dcid_to_path_id(dcid_seq, pid)?;
path.active_dcid_seq = Some(dcid_seq);
dcid_seq
};
(pid, dcid_seq)
} else {
let pid = self.create_path_on_client(local_addr, peer_addr)?;
let dcid_seq = self
.paths
.get(pid)?
.active_dcid_seq
.ok_or(Error::InvalidState)?;
(pid, dcid_seq)
};
self.set_active_path(pid, Instant::now())?;
Ok(dcid_seq)
}
pub fn new_scid(
&mut self, scid: &ConnectionId, reset_token: u128, retire_if_needed: bool,
) -> Result<u64> {
self.ids.new_scid(
scid.to_vec().into(),
Some(reset_token),
true,
None,
retire_if_needed,
)
}
pub fn active_scids(&self) -> usize {
self.ids.active_source_cids()
}
#[inline]
pub fn scids_left(&self) -> usize {
let max_active_source_cids = cmp::min(
self.peer_transport_params.active_conn_id_limit,
self.local_transport_params.active_conn_id_limit,
) as usize;
max_active_source_cids - self.active_scids()
}
pub fn retire_dcid(&mut self, dcid_seq: u64) -> Result<()> {
if self.ids.zero_length_dcid() {
return Err(Error::InvalidState);
}
let active_path_dcid_seq = self
.paths
.get_active()?
.active_dcid_seq
.ok_or(Error::InvalidState)?;
let active_path_id = self.paths.get_active_path_id()?;
if active_path_dcid_seq == dcid_seq &&
self.ids.lowest_available_dcid_seq().is_none() &&
!self
.paths
.iter()
.any(|(pid, p)| pid != active_path_id && p.usable())
{
return Err(Error::OutOfIdentifiers);
}
if let Some(pid) = self.ids.retire_dcid(dcid_seq)? {
let path = self.paths.get_mut(pid)?;
let dcid_seq = self.ids.lowest_available_dcid_seq();
if let Some(dcid_seq) = dcid_seq {
self.ids.link_dcid_to_path_id(dcid_seq, pid)?;
}
path.active_dcid_seq = dcid_seq;
}
Ok(())
}
pub fn path_event_next(&mut self) -> Option<PathEvent> {
self.paths.pop_event()
}
pub fn retired_scids(&self) -> usize {
self.ids.retired_source_cids()
}
pub fn retired_scid_next(&mut self) -> Option<ConnectionId<'static>> {
self.ids.pop_retired_scid()
}
pub fn available_dcids(&self) -> usize {
self.ids.available_dcids()
}
#[inline]
pub fn paths_iter(&self, from: SocketAddr) -> SocketAddrIter {
SocketAddrIter {
sockaddrs: self
.paths
.iter()
.filter(|(_, p)| p.active_dcid_seq.is_some())
.filter(|(_, p)| p.usable() || p.probing_required())
.filter(|(_, p)| p.local_addr() == from)
.map(|(_, p)| p.peer_addr())
.collect(),
index: 0,
}
}
pub fn close(&mut self, app: bool, err: u64, reason: &[u8]) -> Result<()> {
if self.is_closed() || self.is_draining() {
return Err(Error::Done);
}
if self.local_error.is_some() {
return Err(Error::Done);
}
let is_safe_to_send_app_data =
self.is_established() || self.is_in_early_data();
if app && !is_safe_to_send_app_data {
self.local_error = Some(ConnectionError {
is_app: false,
error_code: 0x0c,
reason: vec![],
});
} else {
self.local_error = Some(ConnectionError {
is_app: app,
error_code: err,
reason: reason.to_vec(),
});
}
if self.recv_count == 0 {
self.mark_closed();
}
Ok(())
}
#[inline]
pub fn trace_id(&self) -> &str {
&self.trace_id
}
#[inline]
pub fn application_proto(&self) -> &[u8] {
self.alpn.as_ref()
}
#[inline]
pub fn server_name(&self) -> Option<&str> {
self.handshake.server_name()
}
#[inline]
pub fn peer_cert(&self) -> Option<&[u8]> {
self.handshake.peer_cert()
}
#[inline]
pub fn peer_cert_chain(&self) -> Option<Vec<&[u8]>> {
self.handshake.peer_cert_chain()
}
#[inline]
pub fn session(&self) -> Option<&[u8]> {
self.session.as_deref()
}
#[inline]
pub fn source_id(&self) -> ConnectionId<'_> {
if let Ok(path) = self.paths.get_active() {
if let Some(active_scid_seq) = path.active_scid_seq {
if let Ok(e) = self.ids.get_scid(active_scid_seq) {
return ConnectionId::from_ref(e.cid.as_ref());
}
}
}
let e = self.ids.oldest_scid();
ConnectionId::from_ref(e.cid.as_ref())
}
#[inline]
pub fn source_ids(&self) -> impl Iterator<Item = &ConnectionId<'_>> {
self.ids.scids_iter()
}
#[inline]
pub fn destination_id(&self) -> ConnectionId<'_> {
if let Ok(path) = self.paths.get_active() {
if let Some(active_dcid_seq) = path.active_dcid_seq {
if let Ok(e) = self.ids.get_dcid(active_dcid_seq) {
return ConnectionId::from_ref(e.cid.as_ref());
}
}
}
let e = self.ids.oldest_dcid();
ConnectionId::from_ref(e.cid.as_ref())
}
#[inline]
pub fn pmtu(&self) -> Option<usize> {
if let Ok(path) = self.paths.get_active() {
path.pmtud.as_ref().and_then(|pmtud| pmtud.get_pmtu())
} else {
None
}
}
#[inline]
pub fn revalidate_pmtu(&mut self) {
if let Ok(active_path) = self.paths.get_active_mut() {
if let Some(pmtud) = active_path.pmtud.as_mut() {
pmtud.revalidate_pmtu();
}
}
}
#[inline]
pub fn is_established(&self) -> bool {
self.handshake_completed
}
#[inline]
pub fn is_resumed(&self) -> bool {
self.handshake.is_resumed()
}
#[inline]
pub fn is_in_early_data(&self) -> bool {
self.handshake.is_in_early_data()
}
#[inline]
pub fn early_data_reason(&self) -> u32 {
self.handshake.early_data_reason()
}
#[inline]
pub fn is_readable(&self) -> bool {
self.streams.has_readable() || self.dgram_recv_front_len().is_some()
}
pub fn is_path_validated(
&self, from: SocketAddr, to: SocketAddr,
) -> Result<bool> {
let pid = self
.paths
.path_id_from_addrs(&(from, to))
.ok_or(Error::InvalidState)?;
Ok(self.paths.get(pid)?.validated())
}
#[inline]
pub fn is_draining(&self) -> bool {
self.draining_timer.is_some()
}
#[inline]
pub fn is_closed(&self) -> bool {
self.closed
}
#[inline]
pub fn is_timed_out(&self) -> bool {
self.timed_out
}
#[inline]
pub fn peer_error(&self) -> Option<&ConnectionError> {
self.peer_error.as_ref()
}
#[inline]
pub fn local_error(&self) -> Option<&ConnectionError> {
self.local_error.as_ref()
}
#[inline]
pub fn stats(&self) -> Stats {
Stats {
recv: self.recv_count,
sent: self.sent_count,
lost: self.lost_count,
spurious_lost: self.spurious_lost_count,
retrans: self.retrans_count,
sent_bytes: self.sent_bytes,
recv_bytes: self.recv_bytes,
acked_bytes: self.acked_bytes,
lost_bytes: self.lost_bytes,
stream_retrans_bytes: self.stream_retrans_bytes,
dgram_recv: self.dgram_recv_count,
dgram_sent: self.dgram_sent_count,
paths_count: self.paths.len(),
reset_stream_count_local: self.reset_stream_local_count,
stopped_stream_count_local: self.stopped_stream_local_count,
reset_stream_count_remote: self.reset_stream_remote_count,
stopped_stream_count_remote: self.stopped_stream_remote_count,
data_blocked_sent_count: self.data_blocked_sent_count,
stream_data_blocked_sent_count: self.stream_data_blocked_sent_count,
data_blocked_recv_count: self.data_blocked_recv_count,
stream_data_blocked_recv_count: self.stream_data_blocked_recv_count,
streams_blocked_bidi_recv_count: self.streams_blocked_bidi_recv_count,
streams_blocked_uni_recv_count: self.streams_blocked_uni_recv_count,
path_challenge_rx_count: self.path_challenge_rx_count,
bytes_in_flight_duration: self.bytes_in_flight_duration(),
tx_buffered_state: self.tx_buffered_state,
}
}
fn bytes_in_flight_duration(&self) -> Duration {
self.paths.iter().fold(Duration::ZERO, |acc, (_, path)| {
acc + path.bytes_in_flight_duration()
})
}
pub fn peer_transport_params(&self) -> Option<&TransportParams> {
if !self.parsed_peer_transport_params {
return None;
}
Some(&self.peer_transport_params)
}
pub fn path_stats(&self) -> impl Iterator<Item = PathStats> + '_ {
self.paths.iter().map(|(_, p)| p.stats())
}
pub fn is_server(&self) -> bool {
self.is_server
}
fn encode_transport_params(&mut self) -> Result<()> {
self.handshake.set_quic_transport_params(
&self.local_transport_params,
self.is_server,
)
}
fn parse_peer_transport_params(
&mut self, peer_params: TransportParams,
) -> Result<()> {
match &peer_params.initial_source_connection_id {
Some(v) if v != &self.destination_id() =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None => return Err(Error::InvalidTransportParam),
}
if let Some(odcid) = &self.odcid {
match &peer_params.original_destination_connection_id {
Some(v) if v != odcid =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None if !self.is_server =>
return Err(Error::InvalidTransportParam),
None => (),
}
}
if let Some(rscid) = &self.rscid {
match &peer_params.retry_source_connection_id {
Some(v) if v != rscid =>
return Err(Error::InvalidTransportParam),
Some(_) => (),
None => return Err(Error::InvalidTransportParam),
}
}
self.process_peer_transport_params(peer_params)?;
self.parsed_peer_transport_params = true;
Ok(())
}
fn process_peer_transport_params(
&mut self, peer_params: TransportParams,
) -> Result<()> {
self.max_tx_data = peer_params.initial_max_data;
self.update_tx_cap();
self.streams
.update_peer_max_streams_bidi(peer_params.initial_max_streams_bidi);
self.streams
.update_peer_max_streams_uni(peer_params.initial_max_streams_uni);
let max_ack_delay = Duration::from_millis(peer_params.max_ack_delay);
self.recovery_config.max_ack_delay = max_ack_delay;
let active_path = self.paths.get_active_mut()?;
active_path.recovery.update_max_ack_delay(max_ack_delay);
if active_path
.pmtud
.as_ref()
.map(|pmtud| pmtud.should_probe())
.unwrap_or(false)
{
active_path.recovery.pmtud_update_max_datagram_size(
active_path
.pmtud
.as_mut()
.expect("PMTUD existence verified above")
.get_probe_size()
.min(peer_params.max_udp_payload_size as usize),
);
} else {
active_path.recovery.update_max_datagram_size(
peer_params.max_udp_payload_size as usize,
);
}
self.ids
.set_source_conn_id_limit(peer_params.active_conn_id_limit);
self.peer_transport_params = peer_params;
Ok(())
}
fn do_handshake(&mut self, now: Instant) -> Result<()> {
let mut ex_data = tls::ExData {
application_protos: &self.application_protos,
crypto_ctx: &mut self.crypto_ctx,
session: &mut self.session,
local_error: &mut self.local_error,
keylog: self.keylog.as_mut(),
trace_id: &self.trace_id,
local_transport_params: self.local_transport_params.clone(),
recovery_config: self.recovery_config,
tx_cap_factor: self.tx_cap_factor,
pmtud: None,
is_server: self.is_server,
use_initial_max_data_as_flow_control_win: false,
};
if self.handshake_completed {
return self.handshake.process_post_handshake(&mut ex_data);
}
match self.handshake.do_handshake(&mut ex_data) {
Ok(_) => (),
Err(Error::Done) => {
if self
.paths
.get_active()
.map(|p| p.can_reinit_recovery())
.unwrap_or(false)
{
if ex_data.recovery_config != self.recovery_config {
if let Ok(path) = self.paths.get_active_mut() {
self.recovery_config = ex_data.recovery_config;
path.reinit_recovery(&self.recovery_config);
}
}
if ex_data.tx_cap_factor != self.tx_cap_factor {
self.tx_cap_factor = ex_data.tx_cap_factor;
}
if let Some((discover, max_probes)) = ex_data.pmtud {
self.paths.set_discover_pmtu_on_existing_paths(
discover,
self.recovery_config.max_send_udp_payload_size,
max_probes,
);
}
if ex_data.local_transport_params !=
self.local_transport_params
{
self.streams.set_max_streams_bidi(
ex_data
.local_transport_params
.initial_max_streams_bidi,
);
self.local_transport_params =
ex_data.local_transport_params;
}
}
if ex_data.use_initial_max_data_as_flow_control_win {
self.enable_use_initial_max_data_as_flow_control_win();
}
let raw_params = self.handshake.quic_transport_params();
if !self.parsed_peer_transport_params && !raw_params.is_empty() {
let peer_params = TransportParams::decode(
raw_params,
self.is_server,
self.peer_transport_params_track_unknown,
)?;
self.parse_peer_transport_params(peer_params)?;
}
return Ok(());
},
Err(e) => return Err(e),
};
self.handshake_completed = self.handshake.is_completed();
self.alpn = self.handshake.alpn_protocol().to_vec();
let raw_params = self.handshake.quic_transport_params();
if !self.parsed_peer_transport_params && !raw_params.is_empty() {
let peer_params = TransportParams::decode(
raw_params,
self.is_server,
self.peer_transport_params_track_unknown,
)?;
self.parse_peer_transport_params(peer_params)?;
}
if self.handshake_completed {
if self.is_server {
self.handshake_confirmed = true;
self.drop_epoch_state(packet::Epoch::Handshake, now);
}
self.undecryptable_pkts.clear();
trace!("{} connection established: proto={:?} cipher={:?} curve={:?} sigalg={:?} resumed={} {:?}",
&self.trace_id,
std::str::from_utf8(self.application_proto()),
self.handshake.cipher(),
self.handshake.curve(),
self.handshake.sigalg(),
self.handshake.is_resumed(),
self.peer_transport_params);
}
Ok(())
}
fn enable_use_initial_max_data_as_flow_control_win(&mut self) {
self.flow_control.set_window_if_not_tuned_yet(
self.local_transport_params.initial_max_data,
);
self.streams
.set_use_initial_max_data_as_flow_control_win(true);
}
fn write_pkt_type(&self, send_pid: usize) -> Result<Type> {
if self
.local_error
.as_ref()
.is_some_and(|conn_err| !conn_err.is_app)
{
let epoch = match self.handshake.write_level() {
crypto::Level::Initial => packet::Epoch::Initial,
crypto::Level::ZeroRTT => unreachable!(),
crypto::Level::Handshake => packet::Epoch::Handshake,
crypto::Level::OneRTT => packet::Epoch::Application,
};
if !self.handshake_confirmed {
match epoch {
packet::Epoch::Application => return Ok(Type::Handshake),
packet::Epoch::Handshake
if self.crypto_ctx[packet::Epoch::Initial].has_keys() =>
return Ok(Type::Initial),
_ => (),
};
}
return Ok(Type::from_epoch(epoch));
}
for &epoch in packet::Epoch::epochs(
packet::Epoch::Initial..=packet::Epoch::Application,
) {
let crypto_ctx = &self.crypto_ctx[epoch];
let pkt_space = &self.pkt_num_spaces[epoch];
if crypto_ctx.crypto_seal.is_none() {
continue;
}
if crypto_ctx.data_available() || pkt_space.ready() {
return Ok(Type::from_epoch(epoch));
}
for (_, p) in self.paths.iter() {
if p.recovery.has_lost_frames(epoch) {
return Ok(Type::from_epoch(epoch));
}
if p.recovery.loss_probes(epoch) > 0 {
return Ok(Type::from_epoch(epoch));
}
}
}
let send_path = self.paths.get(send_pid)?;
if (self.is_established() || self.is_in_early_data()) &&
(self.should_send_handshake_done() ||
self.flow_control.should_update_max_data() ||
self.should_send_max_data ||
self.blocked_limit.is_some() ||
self.streams_blocked_bidi_state
.has_pending_stream_blocked_frame() ||
self.streams_blocked_uni_state
.has_pending_stream_blocked_frame() ||
self.dgram_send_queue.has_pending() ||
self.local_error
.as_ref()
.is_some_and(|conn_err| conn_err.is_app) ||
self.should_send_max_streams_bidi ||
self.streams.should_update_max_streams_bidi() ||
self.should_send_max_streams_uni ||
self.streams.should_update_max_streams_uni() ||
self.streams.has_flushable() ||
self.streams.has_almost_full() ||
self.streams.has_blocked() ||
self.streams.has_reset() ||
self.streams.has_stopped() ||
self.ids.has_new_scids() ||
self.ids.has_retire_dcids() ||
send_path
.pmtud
.as_ref()
.is_some_and(|pmtud| pmtud.should_probe()) ||
send_path.needs_ack_eliciting ||
send_path.probing_required())
{
if !self.is_server && self.is_in_early_data() {
return Ok(Type::ZeroRTT);
}
return Ok(Type::Short);
}
Err(Error::Done)
}
fn get_or_create_stream(
&mut self, id: u64, local: bool,
) -> Result<&mut stream::Stream<F>> {
self.streams.get_or_create(
id,
&self.local_transport_params,
&self.peer_transport_params,
local,
self.is_server,
)
}
fn process_frame(
&mut self, frame: frame::Frame, hdr: &Header, recv_path_id: usize,
epoch: packet::Epoch, now: Instant,
) -> Result<()> {
trace!("{} rx frm {:?}", self.trace_id, frame);
match frame {
frame::Frame::Padding { .. } => (),
frame::Frame::Ping { .. } => (),
frame::Frame::ACK {
ranges, ack_delay, ..
} => {
let ack_delay = ack_delay
.checked_mul(2_u64.pow(
self.peer_transport_params.ack_delay_exponent as u32,
))
.ok_or(Error::InvalidFrame)?;
if epoch == packet::Epoch::Handshake ||
(epoch == packet::Epoch::Application &&
self.is_established())
{
self.peer_verified_initial_address = true;
}
let handshake_status = self.handshake_status();
let is_app_limited = self.delivery_rate_check_if_app_limited();
let largest_acked = ranges.last().expect(
"ACK frames should always have at least one ack range",
);
for (_, p) in self.paths.iter_mut() {
if self.pkt_num_spaces[epoch]
.largest_tx_pkt_num
.is_some_and(|largest_sent| largest_sent < largest_acked)
{
return Err(Error::InvalidAckRange);
}
if is_app_limited {
p.recovery.delivery_rate_update_app_limited(true);
}
let OnAckReceivedOutcome {
lost_packets,
lost_bytes,
acked_bytes,
spurious_losses,
} = p.recovery.on_ack_received(
&ranges,
ack_delay,
epoch,
handshake_status,
now,
self.pkt_num_manager.skip_pn(),
&self.trace_id,
)?;
let skip_pn = self.pkt_num_manager.skip_pn();
let largest_acked =
p.recovery.get_largest_acked_on_epoch(epoch);
if let Some((largest_acked, skip_pn)) =
largest_acked.zip(skip_pn)
{
if largest_acked > skip_pn {
self.pkt_num_manager.set_skip_pn(None);
}
}
self.lost_count += lost_packets;
self.lost_bytes += lost_bytes as u64;
self.acked_bytes += acked_bytes as u64;
self.spurious_lost_count += spurious_losses;
}
},
frame::Frame::ResetStream {
stream_id,
error_code,
final_size,
} => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState(stream_id));
}
let max_rx_data_left = self.max_rx_data() - self.rx_data;
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let was_readable = stream.is_readable();
let priority_key = Arc::clone(&stream.priority_key);
let stream::RecvBufResetReturn {
max_data_delta,
consumed_flowcontrol,
} = stream.recv.reset(error_code, final_size)?;
if max_data_delta > max_rx_data_left {
return Err(Error::FlowControl);
}
if !was_readable && stream.is_readable() {
self.streams.insert_readable(&priority_key);
}
self.rx_data += max_data_delta;
self.flow_control.add_consumed(consumed_flowcontrol);
self.reset_stream_remote_count =
self.reset_stream_remote_count.saturating_add(1);
},
frame::Frame::StopSending {
stream_id,
error_code,
} => {
if !stream::is_local(stream_id, self.is_server) &&
!stream::is_bidi(stream_id)
{
return Err(Error::InvalidStreamState(stream_id));
}
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let was_writable = stream.is_writable();
let priority_key = Arc::clone(&stream.priority_key);
if let Ok((final_size, unsent)) = stream.send.stop(error_code) {
self.tx_data = self.tx_data.saturating_sub(unsent);
self.tx_buffered =
self.tx_buffered.saturating_sub(unsent as usize);
qlog_with_type!(QLOG_DATA_MV, self.qlog, q, {
let ev_data = EventData::QuicStreamDataMoved(
qlog::events::quic::StreamDataMoved {
stream_id: Some(stream_id),
offset: Some(final_size),
raw: Some(RawInfo {
length: Some(unsent),
..Default::default()
}),
from: Some(DataRecipient::Transport),
to: Some(DataRecipient::Dropped),
..Default::default()
},
);
q.add_event_data_with_instant(ev_data, now).ok();
});
self.streams.insert_reset(stream_id, error_code, final_size);
if !was_writable {
self.streams.insert_writable(&priority_key);
}
self.stopped_stream_remote_count =
self.stopped_stream_remote_count.saturating_add(1);
self.reset_stream_local_count =
self.reset_stream_local_count.saturating_add(1);
}
},
frame::Frame::Crypto { data } => {
if data.max_off() >= MAX_CRYPTO_STREAM_OFFSET {
return Err(Error::CryptoBufferExceeded);
}
self.crypto_ctx[epoch].crypto_stream.recv.write(data)?;
let mut crypto_buf = [0; 512];
let level = crypto::Level::from_epoch(epoch);
let stream = &mut self.crypto_ctx[epoch].crypto_stream;
while let Ok((read, _)) = stream.recv.emit(&mut crypto_buf) {
let recv_buf = &crypto_buf[..read];
self.handshake.provide_data(level, recv_buf)?;
}
self.do_handshake(now)?;
},
frame::Frame::CryptoHeader { .. } => unreachable!(),
frame::Frame::NewToken { .. } =>
if self.is_server {
return Err(Error::InvalidPacket);
},
frame::Frame::Stream { stream_id, data } => {
if !stream::is_bidi(stream_id) &&
stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState(stream_id));
}
let max_rx_data_left = self.max_rx_data() - self.rx_data;
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let max_off_delta =
data.max_off().saturating_sub(stream.recv.max_off());
if max_off_delta > max_rx_data_left {
return Err(Error::FlowControl);
}
let was_readable = stream.is_readable();
let priority_key = Arc::clone(&stream.priority_key);
let was_draining = stream.recv.is_draining();
stream.recv.write(data)?;
if !was_readable && stream.is_readable() {
self.streams.insert_readable(&priority_key);
}
self.rx_data += max_off_delta;
if was_draining {
self.flow_control.add_consumed(max_off_delta);
}
},
frame::Frame::StreamHeader { .. } => unreachable!(),
frame::Frame::MaxData { max } => {
self.max_tx_data = cmp::max(self.max_tx_data, max);
},
frame::Frame::MaxStreamData { stream_id, max } => {
if !stream::is_bidi(stream_id) &&
!stream::is_local(stream_id, self.is_server)
{
return Err(Error::InvalidStreamState(stream_id));
}
let stream = match self.get_or_create_stream(stream_id, false) {
Ok(v) => v,
Err(Error::Done) => return Ok(()),
Err(e) => return Err(e),
};
let was_flushable = stream.is_flushable();
stream.send.update_max_data(max);
let writable = stream.is_writable();
let priority_key = Arc::clone(&stream.priority_key);
if stream.is_flushable() && !was_flushable {
let priority_key = Arc::clone(&stream.priority_key);
self.streams.insert_flushable(&priority_key);
}
if writable {
self.streams.insert_writable(&priority_key);
}
},
frame::Frame::MaxStreamsBidi { max } => {
if max > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams.update_peer_max_streams_bidi(max);
},
frame::Frame::MaxStreamsUni { max } => {
if max > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams.update_peer_max_streams_uni(max);
},
frame::Frame::DataBlocked { .. } => {
self.data_blocked_recv_count =
self.data_blocked_recv_count.saturating_add(1);
},
frame::Frame::StreamDataBlocked { .. } => {
self.stream_data_blocked_recv_count =
self.stream_data_blocked_recv_count.saturating_add(1);
},
frame::Frame::StreamsBlockedBidi { limit } => {
if limit > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams_blocked_bidi_recv_count =
self.streams_blocked_bidi_recv_count.saturating_add(1);
},
frame::Frame::StreamsBlockedUni { limit } => {
if limit > MAX_STREAM_ID {
return Err(Error::InvalidFrame);
}
self.streams_blocked_uni_recv_count =
self.streams_blocked_uni_recv_count.saturating_add(1);
},
frame::Frame::NewConnectionId {
seq_num,
retire_prior_to,
conn_id,
reset_token,
} => {
if self.ids.zero_length_dcid() {
return Err(Error::InvalidState);
}
let mut retired_path_ids = SmallVec::new();
let new_dcid_res = self.ids.new_dcid(
conn_id.into(),
seq_num,
u128::from_be_bytes(reset_token),
retire_prior_to,
&mut retired_path_ids,
);
for (dcid_seq, pid) in retired_path_ids {
let path = self.paths.get_mut(pid)?;
if path.active_dcid_seq != Some(dcid_seq) {
continue;
}
if let Some(new_dcid_seq) =
self.ids.lowest_available_dcid_seq()
{
path.active_dcid_seq = Some(new_dcid_seq);
self.ids.link_dcid_to_path_id(new_dcid_seq, pid)?;
trace!(
"{} path ID {} changed DCID: old seq num {} new seq num {}",
self.trace_id, pid, dcid_seq, new_dcid_seq,
);
} else {
path.active_dcid_seq = None;
trace!(
"{} path ID {} cannot be used; DCID seq num {} has been retired",
self.trace_id, pid, dcid_seq,
);
}
}
new_dcid_res?;
},
frame::Frame::RetireConnectionId { seq_num } => {
if self.ids.zero_length_scid() {
return Err(Error::InvalidState);
}
if let Some(pid) = self.ids.retire_scid(seq_num, &hdr.dcid)? {
let path = self.paths.get_mut(pid)?;
if path.active_scid_seq == Some(seq_num) {
path.active_scid_seq = None;
}
}
},
frame::Frame::PathChallenge { data } => {
self.path_challenge_rx_count += 1;
self.paths
.get_mut(recv_path_id)?
.on_challenge_received(data);
},
frame::Frame::PathResponse { data } => {
self.paths.on_response_received(data)?;
},
frame::Frame::ConnectionClose {
error_code, reason, ..
} => {
self.peer_error = Some(ConnectionError {
is_app: false,
error_code,
reason,
});
let path = self.paths.get_active()?;
self.draining_timer = Some(now + (path.recovery.pto() * 3));
},
frame::Frame::ApplicationClose { error_code, reason } => {
self.peer_error = Some(ConnectionError {
is_app: true,
error_code,
reason,
});
let path = self.paths.get_active()?;
self.draining_timer = Some(now + (path.recovery.pto() * 3));
},
frame::Frame::HandshakeDone => {
if self.is_server {
return Err(Error::InvalidPacket);
}
self.peer_verified_initial_address = true;
self.handshake_confirmed = true;
self.drop_epoch_state(packet::Epoch::Handshake, now);
},
frame::Frame::Datagram { data } => {
if !self.dgram_enabled() {
return Err(Error::InvalidState);
}
if self.dgram_recv_queue.is_full() {
self.dgram_recv_queue.pop();
}
self.dgram_recv_queue.push(data.into())?;
self.dgram_recv_count = self.dgram_recv_count.saturating_add(1);
let path = self.paths.get_mut(recv_path_id)?;
path.dgram_recv_count = path.dgram_recv_count.saturating_add(1);
},
frame::Frame::DatagramHeader { .. } => unreachable!(),
}
Ok(())
}
fn drop_epoch_state(&mut self, epoch: packet::Epoch, now: Instant) {
let crypto_ctx = &mut self.crypto_ctx[epoch];
if crypto_ctx.crypto_open.is_none() {
return;
}
crypto_ctx.clear();
self.pkt_num_spaces[epoch].clear();
let handshake_status = self.handshake_status();
for (_, p) in self.paths.iter_mut() {
p.recovery
.on_pkt_num_space_discarded(epoch, handshake_status, now);
}
trace!("{} dropped epoch {} state", self.trace_id, epoch);
}
fn max_rx_data(&self) -> u64 {
self.flow_control.max_data()
}
fn should_send_handshake_done(&self) -> bool {
self.is_established() && !self.handshake_done_sent && self.is_server
}
fn idle_timeout(&self) -> Option<Duration> {
if self.local_transport_params.max_idle_timeout == 0 &&
self.peer_transport_params.max_idle_timeout == 0
{
return None;
}
let idle_timeout = if self.local_transport_params.max_idle_timeout == 0 {
self.peer_transport_params.max_idle_timeout
} else if self.peer_transport_params.max_idle_timeout == 0 {
self.local_transport_params.max_idle_timeout
} else {
cmp::min(
self.local_transport_params.max_idle_timeout,
self.peer_transport_params.max_idle_timeout,
)
};
let path_pto = match self.paths.get_active() {
Ok(p) => p.recovery.pto(),
Err(_) => Duration::ZERO,
};
let idle_timeout = Duration::from_millis(idle_timeout);
let idle_timeout = cmp::max(idle_timeout, 3 * path_pto);
Some(idle_timeout)
}
fn handshake_status(&self) -> recovery::HandshakeStatus {
recovery::HandshakeStatus {
has_handshake_keys: self.crypto_ctx[packet::Epoch::Handshake]
.has_keys(),
peer_verified_address: self.peer_verified_initial_address,
completed: self.is_established(),
}
}
fn update_tx_cap(&mut self) {
let cwin_available = match self.paths.get_active() {
Ok(p) => p.recovery.cwnd_available() as u64,
Err(_) => 0,
};
let cap =
cmp::min(cwin_available, self.max_tx_data - self.tx_data) as usize;
self.tx_cap = (cap as f64 * self.tx_cap_factor).ceil() as usize;
}
fn delivery_rate_check_if_app_limited(&self) -> bool {
let cwin_available = self
.paths
.iter()
.filter(|&(_, p)| p.active())
.map(|(_, p)| p.recovery.cwnd_available())
.sum();
((self.tx_buffered + self.dgram_send_queue_byte_size()) < cwin_available) &&
(self.tx_data.saturating_sub(self.last_tx_data)) <
cwin_available as u64 &&
cwin_available > 0
}
fn check_tx_buffered_invariant(&mut self) {
if self.tx_buffered > 0 &&
!self.streams.has_flushable() &&
!self
.paths
.iter()
.any(|(_, p)| p.recovery.bytes_in_flight() > 0)
{
self.tx_buffered_state = TxBufferTrackingState::Inconsistent;
}
}
fn set_initial_dcid(
&mut self, cid: ConnectionId<'static>, reset_token: Option<u128>,
path_id: usize,
) -> Result<()> {
self.ids.set_initial_dcid(cid, reset_token, Some(path_id));
self.paths.get_mut(path_id)?.active_dcid_seq = Some(0);
Ok(())
}
fn get_or_create_recv_path_id(
&mut self, recv_pid: Option<usize>, dcid: &ConnectionId, buf_len: usize,
info: &RecvInfo,
) -> Result<usize> {
let ids = &mut self.ids;
let (in_scid_seq, mut in_scid_pid) =
ids.find_scid_seq(dcid).ok_or(Error::InvalidState)?;
if let Some(recv_pid) = recv_pid {
let recv_path = self.paths.get_mut(recv_pid)?;
let cid_entry =
recv_path.active_scid_seq.and_then(|v| ids.get_scid(v).ok());
if cid_entry.map(|e| &e.cid) != Some(dcid) {
let incoming_cid_entry = ids.get_scid(in_scid_seq)?;
let prev_recv_pid =
incoming_cid_entry.path_id.unwrap_or(recv_pid);
if prev_recv_pid != recv_pid {
trace!(
"{} peer reused CID {:?} from path {} on path {}",
self.trace_id,
dcid,
prev_recv_pid,
recv_pid
);
}
trace!(
"{} path ID {} now see SCID with seq num {}",
self.trace_id,
recv_pid,
in_scid_seq
);
recv_path.active_scid_seq = Some(in_scid_seq);
ids.link_scid_to_path_id(in_scid_seq, recv_pid)?;
}
return Ok(recv_pid);
}
if ids.zero_length_scid() {
in_scid_pid = None;
}
if let Some(in_scid_pid) = in_scid_pid {
let old_path = self.paths.get_mut(in_scid_pid)?;
let old_local_addr = old_path.local_addr();
let old_peer_addr = old_path.peer_addr();
trace!(
"{} reused CID seq {} of ({},{}) (path {}) on ({},{})",
self.trace_id,
in_scid_seq,
old_local_addr,
old_peer_addr,
in_scid_pid,
info.to,
info.from
);
self.paths.notify_event(PathEvent::ReusedSourceConnectionId(
in_scid_seq,
(old_local_addr, old_peer_addr),
(info.to, info.from),
));
}
let mut path = path::Path::new(
info.to,
info.from,
&self.recovery_config,
self.path_challenge_recv_max_queue_len,
false,
None,
);
path.max_send_bytes = buf_len * self.max_amplification_factor;
path.active_scid_seq = Some(in_scid_seq);
path.request_validation();
let pid = self.paths.insert_path(path, self.is_server)?;
if in_scid_pid.is_none() {
ids.link_scid_to_path_id(in_scid_seq, pid)?;
}
Ok(pid)
}
fn get_send_path_id(
&self, from: Option<SocketAddr>, to: Option<SocketAddr>,
) -> Result<usize> {
if self.is_established() {
let mut probing = self
.paths
.iter()
.filter(|(_, p)| from.is_none() || Some(p.local_addr()) == from)
.filter(|(_, p)| to.is_none() || Some(p.peer_addr()) == to)
.filter(|(_, p)| p.active_dcid_seq.is_some())
.filter(|(_, p)| p.probing_required())
.map(|(pid, _)| pid);
if let Some(pid) = probing.next() {
return Ok(pid);
}
}
if let Some((pid, p)) = self.paths.get_active_with_pid() {
if from.is_some() && Some(p.local_addr()) != from {
return Err(Error::Done);
}
if to.is_some() && Some(p.peer_addr()) != to {
return Err(Error::Done);
}
return Ok(pid);
};
Err(Error::InvalidState)
}
fn set_active_path(&mut self, path_id: usize, now: Instant) -> Result<()> {
if let Ok(old_active_path) = self.paths.get_active_mut() {
for &e in packet::Epoch::epochs(
packet::Epoch::Initial..=packet::Epoch::Application,
) {
let (lost_packets, lost_bytes) = old_active_path
.recovery
.on_path_change(e, now, &self.trace_id);
self.lost_count += lost_packets;
self.lost_bytes += lost_bytes as u64;
}
}
self.paths.set_active_path(path_id)
}
fn on_peer_migrated(
&mut self, new_pid: usize, disable_dcid_reuse: bool, now: Instant,
) -> Result<()> {
let active_path_id = self.paths.get_active_path_id()?;
if active_path_id == new_pid {
return Ok(());
}
self.set_active_path(new_pid, now)?;
let no_spare_dcid =
self.paths.get_mut(new_pid)?.active_dcid_seq.is_none();
if no_spare_dcid && !disable_dcid_reuse {
self.paths.get_mut(new_pid)?.active_dcid_seq =
self.paths.get_mut(active_path_id)?.active_dcid_seq;
}
Ok(())
}
fn create_path_on_client(
&mut self, local_addr: SocketAddr, peer_addr: SocketAddr,
) -> Result<usize> {
if self.is_server {
return Err(Error::InvalidState);
}
if !self.ids.zero_length_scid() && self.ids.available_scids() == 0 {
return Err(Error::OutOfIdentifiers);
}
let dcid_seq = if self.ids.zero_length_dcid() {
0
} else {
self.ids
.lowest_available_dcid_seq()
.ok_or(Error::OutOfIdentifiers)?
};
let mut path = path::Path::new(
local_addr,
peer_addr,
&self.recovery_config,
self.path_challenge_recv_max_queue_len,
false,
None,
);
path.active_dcid_seq = Some(dcid_seq);
let pid = self
.paths
.insert_path(path, false)
.map_err(|_| Error::OutOfIdentifiers)?;
self.ids.link_dcid_to_path_id(dcid_seq, pid)?;
Ok(pid)
}
fn mark_closed(&mut self) {
#[cfg(feature = "qlog")]
{
let cc = match (self.is_established(), self.timed_out, &self.peer_error, &self.local_error) {
(false, _, _, _) => qlog::events::quic::ConnectionClosed {
initiator: Some(TransportInitiator::Local),
connection_error: None,
application_error: None,
error_code: None,
internal_code: None,
reason: Some("Failed to establish connection".to_string()),
trigger: Some(qlog::events::quic::ConnectionClosedTrigger::HandshakeTimeout)
},
(true, true, _, _) => qlog::events::quic::ConnectionClosed {
initiator: Some(TransportInitiator::Local),
connection_error: None,
application_error: None,
error_code: None,
internal_code: None,
reason: Some("Idle timeout".to_string()),
trigger: Some(qlog::events::quic::ConnectionClosedTrigger::IdleTimeout)
},
(true, false, Some(peer_error), None) => {
let (connection_code, application_error, trigger) = if peer_error.is_app {
(None, Some(qlog::events::ApplicationError::Unknown), None)
} else {
let trigger = if peer_error.error_code == WireErrorCode::NoError as u64 {
Some(qlog::events::quic::ConnectionClosedTrigger::Clean)
} else {
Some(qlog::events::quic::ConnectionClosedTrigger::Error)
};
(Some(qlog::events::ConnectionClosedEventError::TransportError(qlog::events::quic::TransportError::Unknown)), None, trigger)
};
qlog::events::quic::ConnectionClosed {
initiator: Some(TransportInitiator::Remote),
connection_error: connection_code,
application_error,
error_code: Some(peer_error.error_code),
internal_code: None,
reason: Some(String::from_utf8_lossy(&peer_error.reason).to_string()),
trigger,
}
},
(true, false, None, Some(local_error)) => {
let (connection_code, application_error, trigger) = if local_error.is_app {
(None, Some(qlog::events::ApplicationError::Unknown), None)
} else {
let trigger = if local_error.error_code == WireErrorCode::NoError as u64 {
Some(qlog::events::quic::ConnectionClosedTrigger::Clean)
} else {
Some(qlog::events::quic::ConnectionClosedTrigger::Error)
};
(Some(qlog::events::ConnectionClosedEventError::TransportError(qlog::events::quic::TransportError::Unknown)), None, trigger)
};
qlog::events::quic::ConnectionClosed {
initiator: Some(TransportInitiator::Local),
connection_error: connection_code,
application_error,
error_code: Some(local_error.error_code),
internal_code: None,
reason: Some(String::from_utf8_lossy(&local_error.reason).to_string()),
trigger,
}
},
_ => qlog::events::quic::ConnectionClosed {
initiator: None,
connection_error: None,
application_error: None,
error_code: None,
internal_code: None,
reason: None,
trigger: None,
},
};
qlog_with_type!(QLOG_CONNECTION_CLOSED, self.qlog, q, {
let ev_data = EventData::QuicConnectionClosed(cc);
q.add_event_data_now(ev_data).ok();
});
self.qlog.streamer = None;
}
self.closed = true;
}
}
#[cfg(feature = "boringssl-boring-crate")]
impl<F: BufFactory> AsMut<boring::ssl::SslRef> for Connection<F> {
fn as_mut(&mut self) -> &mut boring::ssl::SslRef {
self.handshake.ssl_mut()
}
}
fn drop_pkt_on_err(
e: Error, recv_count: usize, is_server: bool, trace_id: &str,
) -> Error {
if is_server && recv_count == 0 {
return e;
}
trace!("{trace_id} dropped invalid packet");
Error::Done
}
struct AddrTupleFmt(SocketAddr, SocketAddr);
impl std::fmt::Display for AddrTupleFmt {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let AddrTupleFmt(src, dst) = &self;
if src.ip().is_unspecified() || dst.ip().is_unspecified() {
return Ok(());
}
f.write_fmt(format_args!("src:{src} dst:{dst}"))
}
}
#[derive(Clone, Default)]
pub struct Stats {
pub recv: usize,
pub sent: usize,
pub lost: usize,
pub spurious_lost: usize,
pub retrans: usize,
pub sent_bytes: u64,
pub recv_bytes: u64,
pub acked_bytes: u64,
pub lost_bytes: u64,
pub stream_retrans_bytes: u64,
pub dgram_recv: usize,
pub dgram_sent: usize,
pub paths_count: usize,
pub reset_stream_count_local: u64,
pub stopped_stream_count_local: u64,
pub reset_stream_count_remote: u64,
pub stopped_stream_count_remote: u64,
pub data_blocked_sent_count: u64,
pub stream_data_blocked_sent_count: u64,
pub data_blocked_recv_count: u64,
pub stream_data_blocked_recv_count: u64,
pub streams_blocked_bidi_recv_count: u64,
pub streams_blocked_uni_recv_count: u64,
pub path_challenge_rx_count: u64,
pub bytes_in_flight_duration: Duration,
pub tx_buffered_state: TxBufferTrackingState,
}
impl std::fmt::Debug for Stats {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"recv={} sent={} lost={} retrans={}",
self.recv, self.sent, self.lost, self.retrans,
)?;
write!(
f,
" sent_bytes={} recv_bytes={} lost_bytes={}",
self.sent_bytes, self.recv_bytes, self.lost_bytes,
)?;
Ok(())
}
}
#[doc(hidden)]
#[cfg(any(test, feature = "internal"))]
pub mod test_utils;
#[cfg(test)]
mod tests;
pub use crate::packet::ConnectionId;
pub use crate::packet::Header;
pub use crate::packet::Type;
pub use crate::path::PathEvent;
pub use crate::path::PathStats;
pub use crate::path::SocketAddrIter;
pub use crate::recovery::BbrBwLoReductionStrategy;
pub use crate::recovery::BbrParams;
pub use crate::recovery::CongestionControlAlgorithm;
pub use crate::recovery::StartupExit;
pub use crate::recovery::StartupExitReason;
pub use crate::stream::StreamIter;
pub use crate::transport_params::TransportParams;
pub use crate::transport_params::UnknownTransportParameter;
pub use crate::transport_params::UnknownTransportParameterIterator;
pub use crate::transport_params::UnknownTransportParameters;
pub use crate::buffers::BufFactory;
pub use crate::buffers::BufSplit;
pub use crate::error::ConnectionError;
pub use crate::error::Error;
pub use crate::error::Result;
pub use crate::error::WireErrorCode;
mod buffers;
mod cid;
mod crypto;
mod dgram;
mod error;
#[cfg(feature = "ffi")]
mod ffi;
mod flowcontrol;
mod frame;
pub mod h3;
mod minmax;
mod packet;
mod path;
mod pmtud;
mod rand;
mod range_buf;
mod ranges;
mod recovery;
mod stream;
mod tls;
mod transport_params;