1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Objects associated with connection handling.

use std::{collections::HashMap, net::SocketAddr, ops::Not, sync::Arc};

use parking_lot::RwLock;
use tokio::{
    io::{AsyncRead, AsyncWrite},
    net::TcpStream,
    sync::oneshot,
    task::JoinHandle,
};

use crate::Stats;

#[cfg(doc)]
use crate::protocols::{Handshake, Reading, Writing};

#[derive(Default)]
pub(crate) struct Connections(RwLock<HashMap<SocketAddr, Connection>>);

impl Connections {
    pub(crate) fn add(&self, conn: Connection) {
        self.0.write().insert(conn.addr(), conn);
    }

    pub(crate) fn get_info(&self, addr: SocketAddr) -> Option<ConnectionInfo> {
        self.0.read().get(&addr).map(|conn| conn.info.clone())
    }

    pub(crate) fn infos(&self) -> HashMap<SocketAddr, ConnectionInfo> {
        self.0
            .read()
            .iter()
            .map(|(addr, conn)| (*addr, conn.info.clone()))
            .collect()
    }

    pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
        self.0.read().contains_key(&addr)
    }

    pub(crate) fn remove(&self, addr: SocketAddr) -> Option<Connection> {
        self.0.write().remove(&addr)
    }

    pub(crate) fn num_connected(&self) -> usize {
        self.0.read().len()
    }

    pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
        self.0.read().keys().copied().collect()
    }
}

/// Indicates who was the initiator and who was the responder when the connection was established.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConnectionSide {
    /// The side that initiated the connection.
    Initiator,
    /// The sider that accepted the connection.
    Responder,
}

impl Not for ConnectionSide {
    type Output = Self;

    fn not(self) -> Self::Output {
        match self {
            Self::Initiator => Self::Responder,
            Self::Responder => Self::Initiator,
        }
    }
}

/// A helper trait to facilitate trait-objectification of connection readers.
pub(crate) trait AR: AsyncRead + Unpin + Send + Sync {}
impl<T: AsyncRead + Unpin + Send + Sync> AR for T {}

/// A helper trait to facilitate trait-objectification of connection writers.
pub(crate) trait AW: AsyncWrite + Unpin + Send + Sync {}
impl<T: AsyncWrite + Unpin + Send + Sync> AW for T {}

/// Basic information related to a connection.
#[derive(Clone)]
pub struct ConnectionInfo {
    /// The address of the connection.
    addr: SocketAddr,
    /// The connection's side in relation to the node.
    side: ConnectionSide,
    /// Basic statistics related to a connection.
    stats: Arc<Stats>,
}

impl ConnectionInfo {
    /// Returns the address associated with the connection.
    #[inline]
    pub const fn addr(&self) -> SocketAddr {
        self.addr
    }

    /// Returns `Initiator` if the associated peer initiated the connection
    /// and `Responder` if the connection request was initiated by the node.
    #[inline]
    pub const fn side(&self) -> ConnectionSide {
        self.side
    }

    /// Returns basic statistics related to a connection.
    #[inline]
    pub const fn stats(&self) -> &Arc<Stats> {
        &self.stats
    }
}

/// Created for each active connection; used by the protocols to obtain a handle for
/// reading and writing, and keeps track of tasks that have been spawned for the purposes
/// of the connection.
pub struct Connection {
    /// Basic information related to a connection.
    pub(crate) info: ConnectionInfo,
    /// Available and used only in the [`Handshake`] protocol.
    pub(crate) stream: Option<TcpStream>,
    /// Available and used only in the [`Reading`] protocol.
    pub(crate) reader: Option<Box<dyn AR>>,
    /// Available and used only in the [`Writing`] protocol.
    pub(crate) writer: Option<Box<dyn AW>>,
    /// Used to notify the [`Reading`] protocol that the connection is fully ready.
    pub(crate) readiness_notifier: Option<oneshot::Sender<()>>,
    /// Handles to tasks spawned for the connection.
    pub(crate) tasks: Vec<JoinHandle<()>>,
}

impl Connection {
    /// Creates a [`Connection`] with placeholders for protocol-related objects.
    pub(crate) fn new(addr: SocketAddr, stream: TcpStream, side: ConnectionSide) -> Self {
        Self {
            info: ConnectionInfo {
                addr,
                side,
                stats: Default::default(),
            },
            stream: Some(stream),
            reader: None,
            writer: None,
            readiness_notifier: None,
            tasks: Default::default(),
        }
    }

    /// Returns basic information associated with the connection.
    #[inline]
    pub const fn info(&self) -> &ConnectionInfo {
        &self.info
    }

    /// Returns the address associated with the connection.
    #[inline]
    pub const fn addr(&self) -> SocketAddr {
        self.info.addr()
    }

    /// Returns `Initiator` if the associated peer initiated the connection
    /// and `Responder` if the connection request was initiated by the node.
    #[inline]
    pub const fn side(&self) -> ConnectionSide {
        self.info.side()
    }

    /// Returns basic statistics related to a connection.
    #[inline]
    pub const fn stats(&self) -> &Arc<Stats> {
        self.info.stats()
    }
}