snarkos_node_tcp/helpers/
connections.rs1use std::{collections::HashMap, net::SocketAddr, ops::Not, sync::atomic::AtomicBool};
19
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::RwLock;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::RwLock;
24use tokio::{
25 io::{AsyncRead, AsyncWrite},
26 net::TcpStream,
27 sync::oneshot,
28 task::JoinHandle,
29};
30use tracing::*;
31
32#[cfg(doc)]
33use crate::protocols::{Handshake, Reading, Writing};
34
35#[derive(Default)]
37pub(crate) struct Connections(pub(crate) RwLock<HashMap<SocketAddr, Connection>>);
38
39impl Connections {
40 pub(crate) fn add(&self, conn: Connection) {
42 self.0.write().insert(conn.addr, conn);
43 }
44
45 pub(crate) fn is_connected(&self, addr: SocketAddr) -> bool {
47 self.0.read().contains_key(&addr)
48 }
49
50 pub(crate) fn remove(&self, addr: SocketAddr) -> Option<Connection> {
52 self.0.write().remove(&addr)
53 }
54
55 pub(crate) fn num_connected(&self) -> usize {
57 self.0.read().len()
58 }
59
60 pub(crate) fn addrs(&self) -> Vec<SocketAddr> {
62 self.0.read().keys().copied().collect()
63 }
64}
65
66pub(crate) trait AR: AsyncRead + Unpin + Send + Sync {}
68impl<T: AsyncRead + Unpin + Send + Sync> AR for T {}
69
70pub(crate) trait AW: AsyncWrite + Unpin + Send + Sync {}
72impl<T: AsyncWrite + Unpin + Send + Sync> AW for T {}
73
74pub struct Connection {
77 addr: SocketAddr,
79 side: ConnectionSide,
81 pub(crate) stream: Option<TcpStream>,
83 pub(crate) reader: Option<Box<dyn AR>>,
85 pub(crate) writer: Option<Box<dyn AW>>,
87 pub(crate) readiness_notifier: Option<oneshot::Sender<()>>,
89 pub(crate) disconnecting: AtomicBool,
91 pub(crate) tasks: Vec<JoinHandle<()>>,
93 pub(crate) span: Span,
95}
96
97impl Connection {
98 pub(crate) fn new(addr: SocketAddr, stream: TcpStream, side: ConnectionSide, span: Span) -> Self {
100 Self {
101 addr,
102 stream: Some(stream),
103 reader: None,
104 writer: None,
105 readiness_notifier: None,
106 disconnecting: Default::default(),
107 side,
108 tasks: Default::default(),
109 span,
110 }
111 }
112
113 pub fn addr(&self) -> SocketAddr {
115 self.addr
116 }
117
118 pub fn side(&self) -> ConnectionSide {
121 self.side
122 }
123
124 #[inline]
126 pub const fn span(&self) -> &Span {
127 &self.span
128 }
129}
130
131#[derive(Clone, Copy, Debug, PartialEq, Eq)]
133pub enum ConnectionSide {
134 Initiator,
136 Responder,
138}
139
140impl Not for ConnectionSide {
141 type Output = Self;
142
143 fn not(self) -> Self::Output {
144 match self {
145 Self::Initiator => Self::Responder,
146 Self::Responder => Self::Initiator,
147 }
148 }
149}
150
151impl Drop for Connection {
152 fn drop(&mut self) {
153 for task in self.tasks.iter().rev() {
154 task.abort();
155 }
156 }
157}
158
159pub(crate) fn create_connection_span(addr: SocketAddr, parent: &Span) -> Span {
160 macro_rules! try_span {
161 ($lvl:expr) => {
162 let s = span!(parent: parent, $lvl, "conn", addr = %addr);
163 if !s.is_disabled() {
164 return s;
165 }
166 };
167 }
168 try_span!(Level::TRACE);
169 try_span!(Level::DEBUG);
170 try_span!(Level::INFO);
171 try_span!(Level::WARN);
172 error_span!(parent: parent, "conn", addr = %addr)
173}