1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
//! Objects associated with connection handling.

use crate::Node;

use bytes::Bytes;
use fxhash::FxHashMap;
use parking_lot::RwLock;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{
        tcp::{OwnedReadHalf, OwnedWriteHalf},
        TcpStream,
    },
    sync::mpsc::Sender,
    task::JoinHandle,
};
use tracing::*;

use std::{
    io::{self, ErrorKind},
    net::SocketAddr,
    ops::Not,
};

#[derive(Default)]
pub(crate) struct Connections(RwLock<FxHashMap<SocketAddr, Connection>>);

impl Connections {
    pub(crate) fn sender(&self, addr: SocketAddr) -> io::Result<Sender<Bytes>> {
        if let Some(conn) = self.0.read().get(&addr) {
            conn.sender()
        } else {
            Err(ErrorKind::NotConnected.into())
        }
    }

    pub(crate) fn add(&self, conn: Connection) {
        self.0.write().insert(conn.addr, conn);
    }

    pub(crate) fn senders(&self) -> io::Result<Vec<Sender<Bytes>>> {
        self.0.read().values().map(|conn| conn.sender()).collect()
    }

    pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
        self.0.read().contains_key(&addr)
    }

    pub(crate) fn remove(&self, addr: SocketAddr) -> bool {
        self.0.write().remove(&addr).is_some()
    }

    pub(crate) fn num_connected(&self) -> usize {
        self.0.read().len()
    }

    pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
        self.0.read().keys().copied().collect()
    }
}

/// Indicates who was the initiator and who was the responder when the connection was established.
#[derive(Clone, Copy)]
pub enum ConnectionSide {
    /// The side that initiated the connection.
    Initiator,
    /// The sider that accepted the connection.
    Responder,
}

impl Not for ConnectionSide {
    type Output = Self;

    fn not(self) -> Self::Output {
        match self {
            Self::Initiator => Self::Responder,
            Self::Responder => Self::Initiator,
        }
    }
}

/// An object dedicated to performing reads from a connection's stream;
/// it is available only if the `Reading` protocol is enabled.
pub struct ConnectionReader {
    /// The tracing span of the owning `Node`.
    pub span: Span,
    /// The address of the connection.
    pub addr: SocketAddr,
    /// A buffer dedicated to reading from the stream.
    pub buffer: Box<[u8]>,
    /// The number of bytes from an incomplete read carried over in the buffer.
    pub carry: usize,
    /// The read half of the stream.
    pub reader: OwnedReadHalf,
}

impl ConnectionReader {
    /// Reads as many bytes as there are queued to be read from the stream.
    pub async fn read_queued_bytes(&mut self) -> io::Result<&[u8]> {
        let len = self.reader.read(&mut self.buffer).await?;
        trace!(parent: &self.span, "read {}B from {}", len, self.addr);

        Ok(&self.buffer[..len])
    }

    /// Reads the specified number of bytes from the stream.
    pub async fn read_exact(&mut self, num: usize) -> io::Result<&[u8]> {
        let buffer = &mut self.buffer;

        if num > buffer.len() {
            error!(parent: &self.span, "can' read {}B from the stream; the buffer is too small ({}B)", num, buffer.len());
            return Err(ErrorKind::Other.into());
        }

        self.reader.read_exact(&mut buffer[..num]).await?;
        trace!(parent: &self.span, "read {}B from {}", num, self.addr);

        Ok(&buffer[..num])
    }
}

/// An object dedicated to performing writes to a connection's stream;
/// it is available only if the `Writing` protocol is enabled.
pub struct ConnectionWriter {
    /// The tracing span of the owning `Node`.
    pub span: Span,
    /// The address of the connection.
    pub addr: SocketAddr,
    /// A buffer dedicated to buffering writes to the stream.
    pub buffer: Box<[u8]>,
    /// The number of bytes from an incomplete write carried over in the buffer.
    pub carry: usize,
    /// The write half of the stream.
    pub writer: OwnedWriteHalf,
}

impl ConnectionWriter {
    /// Writes the given buffer to the stream.
    pub async fn write_all(&mut self, buffer: &[u8]) -> io::Result<()> {
        self.writer.write_all(buffer).await?;
        trace!(parent: &self.span, "wrote {}B to {}", buffer.len(), self.addr);

        Ok(())
    }
}

/// Keeps track of tasks that have been spawned for the purposes of a connection; it
/// also contains a sender that communicates with the `Writing` protocol handler.
pub struct Connection {
    /// A reference to the owning node.
    pub node: Node,
    /// The address of the connection.
    pub addr: SocketAddr,
    /// Kept only until the protocols are enabled (`Reading` should `take()` it).
    pub reader: Option<ConnectionReader>,
    /// Kept only until the protocols are enabled (`Writing` should `take()` it).
    pub writer: Option<ConnectionWriter>,
    /// Handles to tasks spawned by the connection.
    pub tasks: Vec<JoinHandle<()>>,
    /// Used to queue writes to the stream.
    pub outbound_message_sender: Option<Sender<Bytes>>,
    /// The connection's side in relation to the node.
    pub side: ConnectionSide,
}

impl Connection {
    /// Creates a `Connection` with placeholders for protocol-related objects.
    pub(crate) fn new(
        addr: SocketAddr,
        stream: TcpStream,
        side: ConnectionSide,
        node: &Node,
    ) -> Self {
        let (reader, writer) = stream.into_split();

        let reader = ConnectionReader {
            span: node.span().clone(),
            addr,
            buffer: vec![0; node.config().conn_read_buffer_size].into(),
            carry: 0,
            reader,
        };

        let writer = ConnectionWriter {
            span: node.span().clone(),
            addr,
            buffer: vec![0; node.config().conn_write_buffer_size].into(),
            carry: 0,
            writer,
        };

        Self {
            node: node.clone(),
            addr,
            reader: Some(reader),
            writer: Some(writer),
            side,
            tasks: Default::default(),
            outbound_message_sender: Default::default(),
        }
    }

    /// Provides mutable access to the underlying `ConnectionReader`; it should only be used in protocol definitions.
    pub fn reader(&mut self) -> &mut ConnectionReader {
        self.reader
            .as_mut()
            .expect("ConnectionReader is not available!")
    }

    /// Provides mutable access to the underlying `ConnectionWriter`; it should only be used in protocol definitions.
    pub fn writer(&mut self) -> &mut ConnectionWriter {
        self.writer
            .as_mut()
            .expect("ConnectionWriter is not available!")
    }

    /// Returns a `Sender` for outbound messages, as long as `Writing` is enabled.
    fn sender(&self) -> io::Result<Sender<Bytes>> {
        if let Some(ref sender) = self.outbound_message_sender {
            Ok(sender.clone())
        } else {
            error!(parent: self.node.span(), "can't send messages: the Writing protocol is disabled");
            Err(ErrorKind::Other.into())
        }
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        debug!(parent: self.node.span(), "disconnecting from {}", self.addr);

        // shut the associated tasks down
        for task in self.tasks.iter().rev() {
            task.abort();
        }

        // if the (owning) node was not the initiator of the connection, it doesn't know the listening address
        // of the associated peer, so the related stats are unreliable; the next connection initiated by the
        // peer could be bound to an entirely different port number
        if matches!(self.side, ConnectionSide::Initiator) {
            self.node.known_peers().remove(self.addr);
        }
    }
}