use crate::Node;
use bytes::Bytes;
use fxhash::FxHashMap;
use parking_lot::RwLock;
use tokio::{
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
},
sync::mpsc::Sender,
task::JoinHandle,
};
use tracing::*;
use std::{io, net::SocketAddr, ops::Not};
#[derive(Default)]
pub(crate) struct Connections(RwLock<FxHashMap<SocketAddr, Connection>>);
impl Connections {
pub(crate) fn sender(&self, addr: SocketAddr) -> io::Result<Sender<Bytes>> {
if let Some(conn) = self.0.read().get(&addr) {
conn.sender()
} else {
Err(io::ErrorKind::NotConnected.into())
}
}
pub(crate) fn add(&self, conn: Connection) {
self.0.write().insert(conn.addr, conn);
}
pub(crate) fn senders(&self) -> Vec<(Sender<Bytes>, SocketAddr)> {
self.0
.read()
.values()
.filter_map(|conn| conn.sender().ok().map(|sender| (sender, conn.addr)))
.collect()
}
pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
self.0.read().contains_key(&addr)
}
pub(crate) fn remove(&self, addr: SocketAddr) -> bool {
self.0.write().remove(&addr).is_some()
}
pub(crate) fn num_connected(&self) -> usize {
self.0.read().len()
}
pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
self.0.read().keys().copied().collect()
}
}
#[derive(Clone, Copy, Debug)]
pub enum ConnectionSide {
Initiator,
Responder,
}
impl Not for ConnectionSide {
type Output = Self;
fn not(self) -> Self::Output {
match self {
Self::Initiator => Self::Responder,
Self::Responder => Self::Initiator,
}
}
}
pub struct Connection {
pub node: Node,
pub addr: SocketAddr,
pub reader: Option<OwnedReadHalf>,
pub writer: Option<OwnedWriteHalf>,
pub tasks: Vec<JoinHandle<()>>,
pub outbound_message_sender: Option<Sender<Bytes>>,
pub side: ConnectionSide,
}
impl Connection {
pub(crate) fn new(
addr: SocketAddr,
stream: TcpStream,
side: ConnectionSide,
node: &Node,
) -> Self {
let (reader, writer) = stream.into_split();
Self {
node: node.clone(),
addr,
reader: Some(reader),
writer: Some(writer),
side,
tasks: Default::default(),
outbound_message_sender: Default::default(),
}
}
pub fn reader(&mut self) -> &mut OwnedReadHalf {
self.reader
.as_mut()
.expect("Connection's reader is not available!")
}
pub fn writer(&mut self) -> &mut OwnedWriteHalf {
self.writer
.as_mut()
.expect("Connection's writer is not available!")
}
fn sender(&self) -> io::Result<Sender<Bytes>> {
if let Some(ref sender) = self.outbound_message_sender {
Ok(sender.clone())
} else {
error!(parent: self.node.span(), "can't send messages: the Writing protocol is disabled");
Err(io::ErrorKind::Other.into())
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
debug!(parent: self.node.span(), "disconnecting from {}", self.addr);
for task in self.tasks.iter().rev() {
task.abort();
}
if matches!(self.side, ConnectionSide::Initiator) {
self.node.known_peers().remove(self.addr);
}
}
}