1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

//! Allows to accept connections

use crate::{
    connection,
    connection::Connection,
    endpoint::{close, close::CloseHandle, connect},
};
use core::{
    pin::Pin,
    task::{Context, Poll, Waker},
};
use futures_channel::mpsc;
use futures_core::Stream;

/// Held by application. Used to accept new connections.
pub(crate) type AcceptorReceiver = mpsc::UnboundedReceiver<Connection>;
/// Held by library. Used to notify the application of newly-accepted connections.
pub(crate) type AcceptorSender = mpsc::UnboundedSender<Connection>;

/// Held by library. Used to receive connection attempts from the application.
pub(crate) type ConnectorReceiver = mpsc::Receiver<connect::Request>;
/// Held by application. Used to submit connection attempts to the library.
pub(crate) type ConnectorSender = mpsc::Sender<connect::Request>;

/// Held by library. Used to receive close attempts from the application.
pub(crate) type CloseReceiver = mpsc::Receiver<Waker>;
/// Held by the application. Used to submit connection close attempts to the library.
pub(crate) type CloseSender = mpsc::Sender<Waker>;

/// The [`Handle`] allows applications to accept and open QUIC connections on an `Endpoint`.
#[derive(Debug)]
pub(crate) struct Handle {
    pub acceptor: Acceptor,
    pub connector: Connector,
}

impl Handle {
    /// Creates a new `Handle` with a limit opening connection limit.
    pub(crate) fn new(
        max_opening_connections: usize,
    ) -> (Self, AcceptorSender, ConnectorReceiver, CloseHandle) {
        let (acceptor_sender, acceptor_receiver) = mpsc::unbounded();
        let (connector_sender, connector_receiver) = mpsc::channel(max_opening_connections);

        let (close_sender, close_receiver) = mpsc::channel(max_opening_connections);

        let endpoint_state = close::EndpointState::default();
        let closer = close::Closer::new(close_sender, endpoint_state.clone());
        let handle = Self {
            acceptor: Acceptor {
                acceptor: acceptor_receiver,
            },
            connector: Connector {
                connector: connector_sender,
                closer,
            },
        };
        (
            handle,
            acceptor_sender,
            connector_receiver,
            CloseHandle::new(close_receiver, endpoint_state),
        )
    }
}

#[derive(Debug)]
pub struct Acceptor {
    acceptor: AcceptorReceiver,
}

impl Acceptor {
    /// Polls for incoming connections and returns them.
    ///
    /// The method will return
    /// - `Poll::Ready(Some(connection))` if a connection was accepted.
    /// - `Poll::Ready(None)` if the acceptor is closed.
    /// - `Poll::Pending` if no new connection was accepted yet.
    ///   In this case the caller must retry polling as soon as a client
    ///   establishes a connection.
    ///   In order to notify the application of this condition,
    ///   the method will save the [`core::task::Waker`] which is provided as part of the
    ///   [`Context`] parameter, and notify it as soon as retrying
    ///   the method will yield a different result.
    pub fn poll_accept(&mut self, context: &mut Context) -> Poll<Option<Connection>> {
        match Stream::poll_next(Pin::new(&mut self.acceptor), context) {
            Poll::Ready(Some(connection)) => Poll::Ready(Some(connection)),
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

#[derive(Clone, Debug)]
pub struct Connector {
    connector: ConnectorSender,
    closer: close::Closer,
}

impl Connector {
    /// Attempts to establish a connection to an endpoint and returns a future to be awaited
    pub fn connect(&self, connect: connect::Connect) -> connect::Attempt {
        connect::Attempt::new(&self.connector, connect)
    }

    /// Polls to close the endpoint
    pub fn poll_close(&mut self, context: &mut Context) -> Poll<Result<(), connection::Error>> {
        self.closer.poll_close(context)
    }
}