1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{
connection::{self, Connection},
endpoint::handle::ConnectorSender,
};
use core::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_channel::oneshot;
use s2n_quic_core::{application::ServerName, inet::SocketAddress, path::RemoteAddress};
/// Held by connection Attempt future. Used to receive the actual connection.
pub(crate) type ConnectionReceiver = oneshot::Receiver<Result<Connection, connection::Error>>;
/// Held within the library connection_container. Used to send the actual connection once
/// its been created.
pub(crate) type ConnectionSender = oneshot::Sender<Result<Connection, connection::Error>>;
#[derive(Clone, Debug)]
pub struct Connect {
pub(crate) remote_address: RemoteAddress,
pub(crate) server_name: Option<ServerName>,
}
impl fmt::Display for Connect {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if f.alternate() {
if let Some(hostname) = self.server_name.as_deref() {
write!(f, "{hostname} at {}", &*self.remote_address)
} else {
write!(f, "{}", &*self.remote_address)
}
} else if let Some(hostname) = self.server_name.as_deref() {
write!(f, "{hostname}")
} else {
write!(f, "{}", &*self.remote_address)
}
}
}
impl Connect {
/// Creates a connection attempt with the specified remote address
pub fn new<Addr: Into<SocketAddress>>(addr: Addr) -> Self {
Self {
remote_address: addr.into().into(),
server_name: None,
}
}
/// Specifies the server name to use for the connection
#[must_use]
pub fn with_server_name<Name: Into<ServerName>>(self, server_name: Name) -> Self {
Self {
server_name: Some(server_name.into()),
..self
}
}
}
/// Make it easy for applications to create a connection attempt without importing the `Connect` struct
impl<T: Into<SocketAddress>> From<T> for Connect {
fn from(addr: T) -> Self {
Self::new(addr)
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct Request {
pub connect: Connect,
pub sender: ConnectionSender,
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Attempt {
state: AttemptState,
}
impl Attempt {
/// Creates a connection attempt
///
/// The flow is currently implemented as follows:
///
/// * The applications provides a `Connect` struct containing information for the remote endpoint
/// * The attempt creates a oneshot channel and creates a `Request` with the sender and `Connect` struct
/// * The attempt returns a `Self` while holding on to the oneshot receiver
/// * The application polls the `Attempt` until either a successful `Connection` or `connection::Error` is
/// received over the oneshot receiver.
pub(crate) fn new(opener: &ConnectorSender, connect: Connect) -> Self {
// open a oneshot channel to receive the connection or error after the endpoint attempted the handshake
let (response, receiver) = oneshot::channel();
// The request includes both the connection info and response onshot channel
let request = Request {
connect,
sender: response,
};
Self {
state: AttemptState::Connect(request, opener.clone(), receiver),
}
}
}
enum AttemptState {
/// The attempt is currently waiting for capacity in the `ConnectorSender` to make the `Request`
Connect(Request, ConnectorSender, ConnectionReceiver),
/// The attempt is currently waiting for a response back from the endpoint on the `ConnectionReceiver`
Waiting(ConnectionReceiver),
/// This is an intermediate state and should not persist across calls to `poll`
Unreachable,
}
impl Future for Attempt {
type Output = Result<Connection, connection::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match core::mem::replace(&mut self.state, AttemptState::Unreachable) {
AttemptState::Connect(request, mut opener, response) => {
match opener.poll_ready(cx) {
Poll::Ready(Ok(())) => {
match opener.try_send(request) {
Ok(_) => {
// transition to the waiting state
self.state = AttemptState::Waiting(response);
continue;
}
Err(err) if err.is_full() => {
// reset to the original state
self.state =
AttemptState::Connect(err.into_inner(), opener, response);
// yield and wake up the task since the opener misreported its ready state
cx.waker().wake_by_ref();
}
Err(_) => {
// The endpoint has closed
return Err(connection::Error::unspecified()).into();
}
}
}
Poll::Ready(Err(_)) => {
// The endpoint has closed
return Err(connection::Error::unspecified()).into();
}
Poll::Pending => {
// reset to the original state
self.state = AttemptState::Connect(request, opener, response);
}
}
return Poll::Pending;
}
AttemptState::Waiting(mut response) => {
return match Pin::new(&mut response).poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(_)) => {
// The endpoint has closed
Err(connection::Error::unspecified()).into()
}
Poll::Pending => {
self.state = AttemptState::Waiting(response);
Poll::Pending
}
};
}
AttemptState::Unreachable => {
unreachable!(
"Unreachable is an immediate state and should not exist across polls"
);
}
}
}
}
}