1mod error;
22mod handler;
23mod listeners;
24mod substream;
25
26pub(crate) mod manager;
27pub(crate) mod pool;
28
29pub use error::{ConnectionError, PendingConnectionError};
30pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
31pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
32pub use manager::ConnectionId;
33pub use substream::{Substream, SubstreamEndpoint, Close};
34pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
35pub use pool::{ConnectionLimits, ConnectionCounters};
36
37use crate::muxing::StreamMuxer;
38use crate::{Multiaddr, PeerId};
39use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
40use std::hash::Hash;
41use substream::{Muxing, SubstreamEvent};
42
43#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
45pub enum Endpoint {
46 Dialer,
48 Listener,
50}
51
52impl std::ops::Not for Endpoint {
53 type Output = Endpoint;
54
55 fn not(self) -> Self::Output {
56 match self {
57 Endpoint::Dialer => Endpoint::Listener,
58 Endpoint::Listener => Endpoint::Dialer
59 }
60 }
61}
62
63impl Endpoint {
64 pub fn is_dialer(self) -> bool {
66 matches!(self, Endpoint::Dialer)
67 }
68
69 pub fn is_listener(self) -> bool {
71 matches!(self, Endpoint::Listener)
72 }
73}
74
75#[derive(PartialEq, Eq, Debug, Clone, Hash)]
77pub enum ConnectedPoint {
78 Dialer {
80 address: Multiaddr,
82 },
83 Listener {
85 local_addr: Multiaddr,
87 send_back_addr: Multiaddr,
89 }
90}
91
92impl From<&'_ ConnectedPoint> for Endpoint {
93 fn from(endpoint: &'_ ConnectedPoint) -> Endpoint {
94 endpoint.to_endpoint()
95 }
96}
97
98impl From<ConnectedPoint> for Endpoint {
99 fn from(endpoint: ConnectedPoint) -> Endpoint {
100 endpoint.to_endpoint()
101 }
102}
103
104impl ConnectedPoint {
105 pub fn to_endpoint(&self) -> Endpoint {
107 match self {
108 ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
109 ConnectedPoint::Listener { .. } => Endpoint::Listener
110 }
111 }
112
113 pub fn is_dialer(&self) -> bool {
115 match self {
116 ConnectedPoint::Dialer { .. } => true,
117 ConnectedPoint::Listener { .. } => false
118 }
119 }
120
121 pub fn is_listener(&self) -> bool {
123 match self {
124 ConnectedPoint::Dialer { .. } => false,
125 ConnectedPoint::Listener { .. } => true
126 }
127 }
128
129 pub fn get_remote_address(&self) -> &Multiaddr {
136 match self {
137 ConnectedPoint::Dialer { address } => address,
138 ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
139 }
140 }
141
142 pub fn set_remote_address(&mut self, new_address: Multiaddr) {
146 match self {
147 ConnectedPoint::Dialer { address } => *address = new_address,
148 ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
149 }
150 }
151}
152
153#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct Connected {
156 pub endpoint: ConnectedPoint,
158 pub peer_id: PeerId,
160}
161
162#[derive(Debug, Clone)]
164pub enum Event<T> {
165 Handler(T),
167 AddressChange(Multiaddr),
169}
170
171pub struct Connection<TMuxer, THandler>
173where
174 TMuxer: StreamMuxer,
175 THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
176{
177 muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
179 handler: THandler,
181}
182
183impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
184where
185 TMuxer: StreamMuxer,
186 THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
187{
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("Connection")
190 .field("muxing", &self.muxing)
191 .field("handler", &self.handler)
192 .finish()
193 }
194}
195
196impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
197where
198 TMuxer: StreamMuxer,
199 THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
200{
201}
202
203impl<TMuxer, THandler> Connection<TMuxer, THandler>
204where
205 TMuxer: StreamMuxer,
206 THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
207{
208 pub fn new(muxer: TMuxer, handler: THandler) -> Self {
211 Connection {
212 muxing: Muxing::new(muxer),
213 handler,
214 }
215 }
216
217 pub fn handler(&self) -> &THandler {
219 &self.handler
220 }
221
222 pub fn handler_mut(&mut self) -> &mut THandler {
224 &mut self.handler
225 }
226
227 pub fn inject_event(&mut self, event: THandler::InEvent) {
229 self.handler.inject_event(event);
230 }
231
232 pub fn close(self) -> Close<TMuxer> {
235 self.muxing.close().0
236 }
237
238 pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
241 -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
242 {
243 loop {
244 let mut io_pending = false;
245
246 match self.muxing.poll(cx) {
249 Poll::Pending => io_pending = true,
250 Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
251 self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
252 }
253 Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
254 let endpoint = SubstreamEndpoint::Dialer(user_data);
255 self.handler.inject_substream(substream, endpoint)
256 }
257 Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
258 self.handler.inject_address_change(&address);
259 return Poll::Ready(Ok(Event::AddressChange(address)));
260 }
261 Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
262 }
263
264 match self.handler.poll(cx) {
266 Poll::Pending => {
267 if io_pending {
268 return Poll::Pending }
270 }
271 Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
272 self.muxing.open_substream(user_data);
273 }
274 Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
275 return Poll::Ready(Ok(Event::Handler(event)));
276 }
277 Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
278 }
279 }
280 }
281}
282
283#[derive(Debug, Copy, Clone)]
285pub struct IncomingInfo<'a> {
286 pub local_addr: &'a Multiaddr,
288 pub send_back_addr: &'a Multiaddr,
290}
291
292impl<'a> IncomingInfo<'a> {
293 pub fn to_connected_point(&self) -> ConnectedPoint {
295 ConnectedPoint::Listener {
296 local_addr: self.local_addr.clone(),
297 send_back_addr: self.send_back_addr.clone(),
298 }
299 }
300}
301
302#[derive(Debug, Copy, Clone)]
304pub struct OutgoingInfo<'a> {
305 pub address: &'a Multiaddr,
306 pub peer_id: Option<&'a PeerId>,
307}
308
309impl<'a> OutgoingInfo<'a> {
310 pub fn to_connected_point(&self) -> ConnectedPoint {
312 ConnectedPoint::Dialer {
313 address: self.address.clone()
314 }
315 }
316}
317
318#[derive(Debug, Clone)]
320pub struct ConnectionLimit {
321 pub limit: u32,
323 pub current: u32,
325}
326
327impl fmt::Display for ConnectionLimit {
328 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329 write!(f, "{}/{}", self.current, self.limit)
330 }
331}
332
333impl Error for ConnectionLimit {}