use std::{
any::Any,
fmt,
future::Future,
io,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker, ready},
};
use bytes::Bytes;
use pin_project_lite::pin_project;
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
use tracing::{Instrument, Span, debug_span};
use crate::{
ConnectionEvent, Duration, Instant, VarInt,
mutex::Mutex,
recv_stream::RecvStream,
runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
send_stream::SendStream,
udp_transmit,
};
use proto::{
ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, Side, StreamEvent,
StreamId, congestion::Controller,
};
#[derive(Debug)]
pub struct Connecting {
conn: Option<ConnectionRef>,
connected: oneshot::Receiver<bool>,
handshake_data_ready: Option<oneshot::Receiver<()>>,
}
impl Connecting {
pub(crate) fn new(
handle: ConnectionHandle,
conn: proto::Connection,
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
socket: Arc<dyn AsyncUdpSocket>,
runtime: Arc<dyn Runtime>,
) -> Self {
let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
let (on_connected_send, on_connected_recv) = oneshot::channel();
let conn = ConnectionRef::new(
handle,
conn,
endpoint_events,
conn_events,
on_handshake_data_send,
on_connected_send,
socket,
runtime.clone(),
);
let driver = ConnectionDriver(conn.clone());
runtime.spawn(Box::pin(
async {
if let Err(e) = driver.await {
tracing::error!("I/O error: {e}");
}
}
.instrument(Span::current()),
));
Self {
conn: Some(conn),
connected: on_connected_recv,
handshake_data_ready: Some(on_handshake_data_recv),
}
}
pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
drop(conn);
if is_ok {
let conn = self.conn.take().unwrap();
Ok((Connection(conn), ZeroRttAccepted(self.connected)))
} else {
Err(self)
}
}
pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
if let Some(x) = self.handshake_data_ready.take() {
let _ = x.await;
}
let conn = self.conn.as_ref().unwrap();
let inner = conn.state.lock("handshake");
inner
.inner
.crypto_session()
.handshake_data()
.ok_or_else(|| {
inner
.error
.clone()
.expect("spurious handshake data ready notification")
})
}
pub fn local_ip(&self) -> Option<IpAddr> {
let conn = self.conn.as_ref().unwrap();
let inner = conn.state.lock("local_ip");
inner.inner.local_ip()
}
pub fn remote_address(&self) -> SocketAddr {
let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
conn_ref.state.lock("remote_address").inner.remote_address()
}
}
impl Future for Connecting {
type Output = Result<Connection, ConnectionError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.connected).poll(cx).map(|_| {
let conn = self.conn.take().unwrap();
let inner = conn.state.lock("connecting");
if inner.connected {
drop(inner);
Ok(Connection(conn))
} else {
Err(inner
.error
.clone()
.expect("connected signaled without connection success or error"))
}
})
}
}
pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
impl Future for ZeroRttAccepted {
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
}
}
#[must_use = "connection drivers must be spawned for their connections to function"]
#[derive(Debug)]
struct ConnectionDriver(ConnectionRef);
impl Future for ConnectionDriver {
type Output = Result<(), io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let conn = &mut *self.0.state.lock("poll");
let span = debug_span!("drive", id = conn.handle.0);
let _guard = span.enter();
if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
conn.terminate(e, &self.0.shared);
return Poll::Ready(Ok(()));
}
let mut keep_going = conn.drive_transmit(cx)?;
keep_going |= conn.drive_timer(cx);
conn.forward_endpoint_events();
conn.forward_app_events(&self.0.shared);
if !conn.inner.is_drained() {
if keep_going {
cx.waker().wake_by_ref();
} else {
conn.driver = Some(cx.waker().clone());
}
return Poll::Pending;
}
if conn.error.is_none() {
unreachable!("drained connections always have an error");
}
Poll::Ready(Ok(()))
}
}
#[derive(Debug, Clone)]
pub struct Connection(ConnectionRef);
impl Connection {
pub fn open_uni(&self) -> OpenUni<'_> {
OpenUni {
conn: &self.0,
notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
}
}
pub fn open_bi(&self) -> OpenBi<'_> {
OpenBi {
conn: &self.0,
notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
}
}
pub fn accept_uni(&self) -> AcceptUni<'_> {
AcceptUni {
conn: &self.0,
notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
}
}
pub fn accept_bi(&self) -> AcceptBi<'_> {
AcceptBi {
conn: &self.0,
notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
}
}
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
conn: &self.0,
notify: self.0.shared.datagram_received.notified(),
}
}
pub async fn closed(&self) -> ConnectionError {
{
let conn = self.0.state.lock("closed");
if let Some(error) = conn.error.as_ref() {
return error.clone();
}
self.0.shared.closed.notified()
}
.await;
self.0
.state
.lock("closed")
.error
.as_ref()
.expect("closed without an error")
.clone()
}
pub fn close_reason(&self) -> Option<ConnectionError> {
self.0.state.lock("close_reason").error.clone()
}
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
let conn = &mut *self.0.state.lock("close");
conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
}
pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
let conn = &mut *self.0.state.lock("send_datagram");
if let Some(ref x) = conn.error {
return Err(SendDatagramError::ConnectionLost(x.clone()));
}
use proto::SendDatagramError::*;
match conn.inner.datagrams().send(data, true) {
Ok(()) => {
conn.wake();
Ok(())
}
Err(e) => Err(match e {
Blocked(..) => unreachable!(),
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
}),
}
}
pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
SendDatagram {
conn: &self.0,
data: Some(data),
notify: self.0.shared.datagrams_unblocked.notified(),
}
}
pub fn max_datagram_size(&self) -> Option<usize> {
self.0
.state
.lock("max_datagram_size")
.inner
.datagrams()
.max_size()
}
pub fn datagram_send_buffer_space(&self) -> usize {
self.0
.state
.lock("datagram_send_buffer_space")
.inner
.datagrams()
.send_buffer_space()
}
pub fn side(&self) -> Side {
self.0.state.lock("side").inner.side()
}
pub fn remote_address(&self) -> SocketAddr {
self.0.state.lock("remote_address").inner.remote_address()
}
pub fn local_ip(&self) -> Option<IpAddr> {
self.0.state.lock("local_ip").inner.local_ip()
}
pub fn rtt(&self) -> Duration {
self.0.state.lock("rtt").inner.rtt()
}
pub fn stats(&self) -> ConnectionStats {
self.0.state.lock("stats").inner.stats()
}
pub fn congestion_state(&self) -> Box<dyn Controller> {
self.0
.state
.lock("congestion_state")
.inner
.congestion_state()
.clone_box()
}
pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
self.0
.state
.lock("handshake_data")
.inner
.crypto_session()
.handshake_data()
}
pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
self.0
.state
.lock("peer_identity")
.inner
.crypto_session()
.peer_identity()
}
pub fn stable_id(&self) -> usize {
self.0.stable_id()
}
pub fn force_key_update(&self) {
self.0
.state
.lock("force_key_update")
.inner
.force_key_update()
}
pub fn export_keying_material(
&self,
output: &mut [u8],
label: &[u8],
context: &[u8],
) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
self.0
.state
.lock("export_keying_material")
.inner
.crypto_session()
.export_keying_material(output, label, context)
}
pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
conn.inner.set_max_concurrent_streams(Dir::Uni, count);
conn.wake();
}
pub fn set_send_window(&self, send_window: u64) {
let mut conn = self.0.state.lock("set_send_window");
conn.inner.set_send_window(send_window);
conn.wake();
}
pub fn set_receive_window(&self, receive_window: VarInt) {
let mut conn = self.0.state.lock("set_receive_window");
conn.inner.set_receive_window(receive_window);
conn.wake();
}
pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
conn.inner.set_max_concurrent_streams(Dir::Bi, count);
conn.wake();
}
}
pin_project! {
pub struct OpenUni<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}
impl Future for OpenUni<'_> {
type Output = Result<SendStream, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
}
}
pin_project! {
pub struct OpenBi<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}
impl Future for OpenBi<'_> {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}
fn poll_open<'a>(
ctx: &mut Context<'_>,
conn: &'a ConnectionRef,
mut notify: Pin<&mut Notified<'a>>,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut state = conn.state.lock("poll_open");
if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
} else if let Some(id) = state.inner.streams().open(dir) {
let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
}
loop {
match notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
notify.set(conn.shared.stream_budget_available[dir as usize].notified())
}
}
}
}
pin_project! {
pub struct AcceptUni<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}
impl Future for AcceptUni<'_> {
type Output = Result<RecvStream, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
}
}
pin_project! {
pub struct AcceptBi<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}
impl Future for AcceptBi<'_> {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}
fn poll_accept<'a>(
ctx: &mut Context<'_>,
conn: &'a ConnectionRef,
mut notify: Pin<&mut Notified<'a>>,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut state = conn.state.lock("poll_accept");
if let Some(id) = state.inner.streams().accept(dir) {
let is_0rtt = state.inner.is_handshaking();
state.wake(); drop(state); return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
} else if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
}
loop {
match notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
}
}
}
pin_project! {
pub struct ReadDatagram<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}
impl Future for ReadDatagram<'_> {
type Output = Result<Bytes, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("ReadDatagram::poll");
if let Some(x) = state.inner.datagrams().recv() {
return Poll::Ready(Ok(x));
} else if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
}
loop {
match this.notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagram_received.notified()),
}
}
}
}
pin_project! {
pub struct SendDatagram<'a> {
conn: &'a ConnectionRef,
data: Option<Bytes>,
#[pin]
notify: Notified<'a>,
}
}
impl Future for SendDatagram<'_> {
type Output = Result<(), SendDatagramError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("SendDatagram::poll");
if let Some(ref e) = state.error {
return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
}
use proto::SendDatagramError::*;
match state
.inner
.datagrams()
.send(this.data.take().unwrap(), false)
{
Ok(()) => {
state.wake();
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(match e {
Blocked(data) => {
this.data.replace(data);
loop {
match this.notify.as_mut().poll(ctx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagrams_unblocked.notified()),
}
}
}
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
Disabled => SendDatagramError::Disabled,
TooLarge => SendDatagramError::TooLarge,
})),
}
}
}
#[derive(Debug)]
pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
impl ConnectionRef {
#[allow(clippy::too_many_arguments)]
fn new(
handle: ConnectionHandle,
conn: proto::Connection,
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
on_handshake_data: oneshot::Sender<()>,
on_connected: oneshot::Sender<bool>,
socket: Arc<dyn AsyncUdpSocket>,
runtime: Arc<dyn Runtime>,
) -> Self {
Self(Arc::new(ConnectionInner {
state: Mutex::new(State {
inner: conn,
driver: None,
handle,
on_handshake_data: Some(on_handshake_data),
on_connected: Some(on_connected),
connected: false,
timer: None,
timer_deadline: None,
conn_events,
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
stopped: FxHashMap::default(),
error: None,
ref_count: 0,
io_poller: socket.clone().create_io_poller(),
socket,
runtime,
send_buffer: Vec::new(),
buffered_transmit: None,
}),
shared: Shared::default(),
}))
}
fn stable_id(&self) -> usize {
&*self.0 as *const _ as usize
}
}
impl Clone for ConnectionRef {
fn clone(&self) -> Self {
self.state.lock("clone").ref_count += 1;
Self(self.0.clone())
}
}
impl Drop for ConnectionRef {
fn drop(&mut self) {
let conn = &mut *self.state.lock("drop");
if let Some(x) = conn.ref_count.checked_sub(1) {
conn.ref_count = x;
if x == 0 && !conn.inner.is_closed() {
conn.implicit_close(&self.shared);
}
}
}
}
impl std::ops::Deref for ConnectionRef {
type Target = ConnectionInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug)]
pub(crate) struct ConnectionInner {
pub(crate) state: Mutex<State>,
pub(crate) shared: Shared,
}
#[derive(Debug, Default)]
pub(crate) struct Shared {
stream_budget_available: [Notify; 2],
stream_incoming: [Notify; 2],
datagram_received: Notify,
datagrams_unblocked: Notify,
closed: Notify,
}
pub(crate) struct State {
pub(crate) inner: proto::Connection,
driver: Option<Waker>,
handle: ConnectionHandle,
on_handshake_data: Option<oneshot::Sender<()>>,
on_connected: Option<oneshot::Sender<bool>>,
connected: bool,
timer: Option<Pin<Box<dyn AsyncTimer>>>,
timer_deadline: Option<Instant>,
conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
pub(crate) stopped: FxHashMap<StreamId, Arc<Notify>>,
pub(crate) error: Option<ConnectionError>,
ref_count: usize,
socket: Arc<dyn AsyncUdpSocket>,
io_poller: Pin<Box<dyn UdpPoller>>,
runtime: Arc<dyn Runtime>,
send_buffer: Vec<u8>,
buffered_transmit: Option<proto::Transmit>,
}
impl State {
fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
let now = self.runtime.now();
let mut transmits = 0;
let max_datagrams = self
.socket
.max_transmit_segments()
.min(MAX_TRANSMIT_SEGMENTS);
loop {
let t = match self.buffered_transmit.take() {
Some(t) => t,
None => {
self.send_buffer.clear();
self.send_buffer.reserve(self.inner.current_mtu() as usize);
match self
.inner
.poll_transmit(now, max_datagrams, &mut self.send_buffer)
{
Some(t) => {
transmits += match t.segment_size {
None => 1,
Some(s) => t.size.div_ceil(s), };
t
}
None => break,
}
}
};
if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
self.buffered_transmit = Some(t);
return Ok(false);
}
let len = t.size;
let retry = match self
.socket
.try_send(&udp_transmit(&t, &self.send_buffer[..len]))
{
Ok(()) => false,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
Err(e) => return Err(e),
};
if retry {
self.buffered_transmit = Some(t);
continue;
}
if transmits >= MAX_TRANSMIT_DATAGRAMS {
return Ok(true);
}
}
Ok(false)
}
fn forward_endpoint_events(&mut self) {
while let Some(event) = self.inner.poll_endpoint_events() {
let _ = self.endpoint_events.send((self.handle, event));
}
}
fn process_conn_events(
&mut self,
shared: &Shared,
cx: &mut Context,
) -> Result<(), ConnectionError> {
loop {
match self.conn_events.poll_recv(cx) {
Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
self.socket = socket;
self.io_poller = self.socket.clone().create_io_poller();
self.inner.local_address_changed();
}
Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
self.inner.handle_event(event);
}
Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
self.close(error_code, reason, shared);
}
Poll::Ready(None) => {
return Err(ConnectionError::TransportError(proto::TransportError {
code: proto::TransportErrorCode::INTERNAL_ERROR,
frame: None,
reason: "endpoint driver future was dropped".to_string(),
}));
}
Poll::Pending => {
return Ok(());
}
}
}
}
fn forward_app_events(&mut self, shared: &Shared) {
while let Some(event) = self.inner.poll() {
use proto::Event::*;
match event {
HandshakeDataReady => {
if let Some(x) = self.on_handshake_data.take() {
let _ = x.send(());
}
}
Connected => {
self.connected = true;
if let Some(x) = self.on_connected.take() {
let _ = x.send(self.inner.accepted_0rtt());
}
if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
wake_all(&mut self.blocked_writers);
wake_all(&mut self.blocked_readers);
wake_all_notify(&mut self.stopped);
}
}
ConnectionLost { reason } => {
self.terminate(reason, shared);
}
Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
}
Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
}
DatagramReceived => {
shared.datagram_received.notify_waiters();
}
DatagramsUnblocked => {
shared.datagrams_unblocked.notify_waiters();
}
Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
Stream(StreamEvent::Available { dir }) => {
shared.stream_budget_available[dir as usize].notify_waiters();
}
Stream(StreamEvent::Finished { id }) => wake_stream_notify(id, &mut self.stopped),
Stream(StreamEvent::Stopped { id, .. }) => {
wake_stream_notify(id, &mut self.stopped);
wake_stream(id, &mut self.blocked_writers);
}
}
}
}
fn drive_timer(&mut self, cx: &mut Context) -> bool {
match self.inner.poll_timeout() {
Some(deadline) => {
if let Some(delay) = &mut self.timer {
if self
.timer_deadline
.map(|current_deadline| current_deadline != deadline)
.unwrap_or(true)
{
delay.as_mut().reset(deadline);
}
} else {
self.timer = Some(self.runtime.new_timer(deadline));
}
self.timer_deadline = Some(deadline);
}
None => {
self.timer_deadline = None;
return false;
}
}
if self.timer_deadline.is_none() {
return false;
}
let delay = self
.timer
.as_mut()
.expect("timer must exist in this state")
.as_mut();
if delay.poll(cx).is_pending() {
return false;
}
self.inner.handle_timeout(self.runtime.now());
self.timer_deadline = None;
true
}
pub(crate) fn wake(&mut self) {
if let Some(x) = self.driver.take() {
x.wake();
}
}
fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
self.error = Some(reason.clone());
if let Some(x) = self.on_handshake_data.take() {
let _ = x.send(());
}
wake_all(&mut self.blocked_writers);
wake_all(&mut self.blocked_readers);
shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
shared.datagram_received.notify_waiters();
shared.datagrams_unblocked.notify_waiters();
if let Some(x) = self.on_connected.take() {
let _ = x.send(false);
}
wake_all_notify(&mut self.stopped);
shared.closed.notify_waiters();
}
fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
self.inner.close(self.runtime.now(), error_code, reason);
self.terminate(ConnectionError::LocallyClosed, shared);
self.wake();
}
pub(crate) fn implicit_close(&mut self, shared: &Shared) {
self.close(0u32.into(), Bytes::new(), shared);
}
pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
if self.inner.is_handshaking()
|| self.inner.accepted_0rtt()
|| self.inner.side().is_server()
{
Ok(())
} else {
Err(())
}
}
}
impl Drop for State {
fn drop(&mut self) {
if !self.inner.is_drained() {
let _ = self
.endpoint_events
.send((self.handle, proto::EndpointEvent::drained()));
}
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("State").field("inner", &self.inner).finish()
}
}
fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
if let Some(waker) = wakers.remove(&stream_id) {
waker.wake();
}
}
fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
wakers.drain().for_each(|(_, waker)| waker.wake())
}
fn wake_stream_notify(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
if let Some(notify) = wakers.remove(&stream_id) {
notify.notify_waiters()
}
}
fn wake_all_notify(wakers: &mut FxHashMap<StreamId, Arc<Notify>>) {
wakers
.drain()
.for_each(|(_, notify)| notify.notify_waiters())
}
#[derive(Debug, Error, Clone, Eq, PartialEq)]
pub enum SendDatagramError {
#[error("datagrams not supported by peer")]
UnsupportedByPeer,
#[error("datagram support disabled")]
Disabled,
#[error("datagram too large")]
TooLarge,
#[error("connection lost")]
ConnectionLost(#[from] ConnectionError),
}
const MAX_TRANSMIT_DATAGRAMS: usize = 20;
const MAX_TRANSMIT_SEGMENTS: usize = 10;