use std::{
collections::HashMap,
net::SocketAddr,
ops::Not,
sync::{Arc, atomic::AtomicBool},
};
use parking_lot::RwLock;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
sync::oneshot,
task::JoinHandle,
};
use crate::Stats;
#[cfg(doc)]
use crate::protocols::{Handshake, Reading, Writing};
#[derive(Default)]
pub(crate) struct Connections(pub(crate) RwLock<HashMap<SocketAddr, Connection>>);
impl Connections {
pub(crate) fn add(&self, conn: Connection) {
self.0.write().insert(conn.addr(), conn);
}
pub(crate) fn get_info(&self, addr: SocketAddr) -> Option<ConnectionInfo> {
self.0.read().get(&addr).map(|conn| conn.info.clone())
}
pub(crate) fn infos(&self) -> HashMap<SocketAddr, ConnectionInfo> {
self.0
.read()
.iter()
.map(|(addr, conn)| (*addr, conn.info.clone()))
.collect()
}
pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
self.0.read().contains_key(&addr)
}
pub(crate) fn remove(&self, addr: SocketAddr) -> Option<Connection> {
self.0.write().remove(&addr)
}
pub(crate) fn num_connected(&self) -> usize {
self.0.read().len()
}
pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
self.0.read().keys().copied().collect()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConnectionSide {
Initiator,
Responder,
}
impl Not for ConnectionSide {
type Output = Self;
fn not(self) -> Self::Output {
match self {
Self::Initiator => Self::Responder,
Self::Responder => Self::Initiator,
}
}
}
pub(crate) trait AR: AsyncRead + Unpin + Send + Sync {}
impl<T: AsyncRead + Unpin + Send + Sync> AR for T {}
pub(crate) trait AW: AsyncWrite + Unpin + Send + Sync {}
impl<T: AsyncWrite + Unpin + Send + Sync> AW for T {}
#[derive(Clone)]
pub struct ConnectionInfo {
addr: SocketAddr,
side: ConnectionSide,
stats: Arc<Stats>,
}
impl ConnectionInfo {
#[inline]
pub const fn addr(&self) -> SocketAddr {
self.addr
}
#[inline]
pub const fn side(&self) -> ConnectionSide {
self.side
}
#[inline]
pub const fn stats(&self) -> &Arc<Stats> {
&self.stats
}
}
pub struct Connection {
pub(crate) info: ConnectionInfo,
pub(crate) stream: Option<TcpStream>,
pub(crate) reader: Option<Box<dyn AR>>,
pub(crate) writer: Option<Box<dyn AW>>,
pub(crate) readiness_notifier: Option<oneshot::Sender<()>>,
pub(crate) disconnecting: AtomicBool,
pub(crate) tasks: Vec<JoinHandle<()>>,
}
impl Connection {
pub(crate) fn new(addr: SocketAddr, stream: TcpStream, side: ConnectionSide) -> Self {
Self {
info: ConnectionInfo {
addr,
side,
stats: Default::default(),
},
stream: Some(stream),
reader: None,
writer: None,
readiness_notifier: None,
disconnecting: Default::default(),
tasks: Default::default(),
}
}
#[inline]
pub const fn info(&self) -> &ConnectionInfo {
&self.info
}
#[inline]
pub const fn addr(&self) -> SocketAddr {
self.info.addr()
}
#[inline]
pub const fn side(&self) -> ConnectionSide {
self.info.side()
}
#[inline]
pub const fn stats(&self) -> &Arc<Stats> {
self.info.stats()
}
}