1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
//! Allows to accept connections
use crate::{
connection,
connection::Connection,
endpoint::{close, close::CloseHandle, connect},
};
use core::{
pin::Pin,
task::{Context, Poll, Waker},
};
use futures_channel::mpsc;
use futures_core::Stream;
/// Held by application. Used to accept new connections.
pub(crate) type AcceptorReceiver = mpsc::UnboundedReceiver<Connection>;
/// Held by library. Used to notify the application of newly-accepted connections.
pub(crate) type AcceptorSender = mpsc::UnboundedSender<Connection>;
/// Held by library. Used to receive connection attempts from the application.
pub(crate) type ConnectorReceiver = mpsc::Receiver<connect::Request>;
/// Held by application. Used to submit connection attempts to the library.
pub(crate) type ConnectorSender = mpsc::Sender<connect::Request>;
/// Held by library. Used to receive close attempts from the application.
pub(crate) type CloseReceiver = mpsc::Receiver<Waker>;
/// Held by the application. Used to submit connection close attempts to the library.
pub(crate) type CloseSender = mpsc::Sender<Waker>;
/// The [`Handle`] allows applications to accept and open QUIC connections on an `Endpoint`.
#[derive(Debug)]
pub(crate) struct Handle {
pub acceptor: Acceptor,
pub connector: Connector,
}
impl Handle {
/// Creates a new `Handle` with a limit opening connection limit.
pub(crate) fn new(
max_opening_connections: usize,
) -> (Self, AcceptorSender, ConnectorReceiver, CloseHandle) {
let (acceptor_sender, acceptor_receiver) = mpsc::unbounded();
let (connector_sender, connector_receiver) = mpsc::channel(max_opening_connections);
let (close_sender, close_receiver) = mpsc::channel(max_opening_connections);
let endpoint_state = close::EndpointState::default();
let closer = close::Closer::new(close_sender, endpoint_state.clone());
let handle = Self {
acceptor: Acceptor {
acceptor: acceptor_receiver,
},
connector: Connector {
connector: connector_sender,
closer,
},
};
(
handle,
acceptor_sender,
connector_receiver,
CloseHandle::new(close_receiver, endpoint_state),
)
}
}
#[derive(Debug)]
pub struct Acceptor {
acceptor: AcceptorReceiver,
}
impl Acceptor {
/// Polls for incoming connections and returns them.
///
/// The method will return
/// - `Poll::Ready(Some(connection))` if a connection was accepted.
/// - `Poll::Ready(None)` if the acceptor is closed.
/// - `Poll::Pending` if no new connection was accepted yet.
/// In this case the caller must retry polling as soon as a client
/// establishes a connection.
/// In order to notify the application of this condition,
/// the method will save the [`core::task::Waker`] which is provided as part of the
/// [`Context`] parameter, and notify it as soon as retrying
/// the method will yield a different result.
pub fn poll_accept(&mut self, context: &mut Context) -> Poll<Option<Connection>> {
match Stream::poll_next(Pin::new(&mut self.acceptor), context) {
Poll::Ready(Some(connection)) => Poll::Ready(Some(connection)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Clone, Debug)]
pub struct Connector {
connector: ConnectorSender,
closer: close::Closer,
}
impl Connector {
/// Attempts to establish a connection to an endpoint and returns a future to be awaited
pub fn connect(&self, connect: connect::Connect) -> connect::Attempt {
connect::Attempt::new(&self.connector, connect)
}
/// Polls to close the endpoint
pub fn poll_close(&mut self, context: &mut Context) -> Poll<Result<(), connection::Error>> {
self.closer.poll_close(context)
}
}