use std::{collections::HashMap, net::SocketAddr, ops::Not, sync::atomic::AtomicBool};
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
sync::oneshot,
task::JoinHandle,
};
use tracing::*;
#[cfg(doc)]
use crate::protocols::{Handshake, Reading, Writing};
#[derive(Default)]
pub(crate) struct Connections(pub(crate) RwLock<HashMap<SocketAddr, Connection>>);
impl Connections {
pub(crate) fn add(&self, conn: Connection) {
self.0.write().insert(conn.addr, conn);
}
pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
self.0.read().contains_key(&addr)
}
pub(crate) fn remove(&self, addr: SocketAddr) -> Option<Connection> {
self.0.write().remove(&addr)
}
pub(crate) fn num_connected(&self) -> usize {
self.0.read().len()
}
pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
self.0.read().keys().copied().collect()
}
}
pub(crate) trait AR: AsyncRead + Unpin + Send + Sync {}
impl<T: AsyncRead + Unpin + Send + Sync> AR for T {}
pub(crate) trait AW: AsyncWrite + Unpin + Send + Sync {}
impl<T: AsyncWrite + Unpin + Send + Sync> AW for T {}
pub struct Connection {
addr: SocketAddr,
side: ConnectionSide,
pub(crate) stream: Option<TcpStream>,
pub(crate) reader: Option<Box<dyn AR>>,
pub(crate) writer: Option<Box<dyn AW>>,
pub(crate) readiness_notifier: Option<oneshot::Sender<()>>,
pub(crate) disconnecting: AtomicBool,
pub(crate) tasks: Vec<JoinHandle<()>>,
pub(crate) span: Span,
}
impl Connection {
pub(crate) fn new(addr: SocketAddr, stream: TcpStream, side: ConnectionSide, span: Span) -> Self {
Self {
addr,
stream: Some(stream),
reader: None,
writer: None,
readiness_notifier: None,
disconnecting: Default::default(),
side,
tasks: Default::default(),
span,
}
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub fn side(&self) -> ConnectionSide {
self.side
}
#[inline]
pub const fn span(&self) -> &Span {
&self.span
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConnectionSide {
Initiator,
Responder,
}
impl Not for ConnectionSide {
type Output = Self;
fn not(self) -> Self::Output {
match self {
Self::Initiator => Self::Responder,
Self::Responder => Self::Initiator,
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
for task in self.tasks.iter().rev() {
task.abort();
}
}
}
pub(crate) fn create_connection_span(addr: SocketAddr, parent: &Span) -> Span {
macro_rules! try_span {
($lvl:expr) => {
let s = span!(parent: parent, $lvl, "conn", addr = %addr);
if !s.is_disabled() {
return s;
}
};
}
try_span!(Level::TRACE);
try_span!(Level::DEBUG);
try_span!(Level::INFO);
try_span!(Level::WARN);
error_span!(parent: parent, "conn", addr = %addr)
}