libp2p_core/
connection.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod error;
22mod handler;
23mod listeners;
24mod substream;
25
26pub(crate) mod manager;
27pub(crate) mod pool;
28
29pub use error::{ConnectionError, PendingConnectionError};
30pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
31pub use listeners::{ListenerId, ListenersStream, ListenersEvent};
32pub use manager::ConnectionId;
33pub use substream::{Substream, SubstreamEndpoint, Close};
34pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
35pub use pool::{ConnectionLimits, ConnectionCounters};
36
37use crate::muxing::StreamMuxer;
38use crate::{Multiaddr, PeerId};
39use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
40use std::hash::Hash;
41use substream::{Muxing, SubstreamEvent};
42
43/// The endpoint roles associated with a peer-to-peer communication channel.
44#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
45pub enum Endpoint {
46    /// The socket comes from a dialer.
47    Dialer,
48    /// The socket comes from a listener.
49    Listener,
50}
51
52impl std::ops::Not for Endpoint {
53    type Output = Endpoint;
54
55    fn not(self) -> Self::Output {
56        match self {
57            Endpoint::Dialer => Endpoint::Listener,
58            Endpoint::Listener => Endpoint::Dialer
59        }
60    }
61}
62
63impl Endpoint {
64    /// Is this endpoint a dialer?
65    pub fn is_dialer(self) -> bool {
66        matches!(self, Endpoint::Dialer)
67    }
68
69    /// Is this endpoint a listener?
70    pub fn is_listener(self) -> bool {
71        matches!(self, Endpoint::Listener)
72    }
73}
74
75/// The endpoint roles associated with a peer-to-peer connection.
76#[derive(PartialEq, Eq, Debug, Clone, Hash)]
77pub enum ConnectedPoint {
78    /// We dialed the node.
79    Dialer {
80        /// Multiaddress that was successfully dialed.
81        address: Multiaddr,
82    },
83    /// We received the node.
84    Listener {
85        /// Local connection address.
86        local_addr: Multiaddr,
87        /// Stack of protocols used to send back data to the remote.
88        send_back_addr: Multiaddr,
89    }
90}
91
92impl From<&'_ ConnectedPoint> for Endpoint {
93    fn from(endpoint: &'_ ConnectedPoint) -> Endpoint {
94        endpoint.to_endpoint()
95    }
96}
97
98impl From<ConnectedPoint> for Endpoint {
99    fn from(endpoint: ConnectedPoint) -> Endpoint {
100        endpoint.to_endpoint()
101    }
102}
103
104impl ConnectedPoint {
105    /// Turns the `ConnectedPoint` into the corresponding `Endpoint`.
106    pub fn to_endpoint(&self) -> Endpoint {
107        match self {
108            ConnectedPoint::Dialer { .. } => Endpoint::Dialer,
109            ConnectedPoint::Listener { .. } => Endpoint::Listener
110        }
111    }
112
113    /// Returns true if we are `Dialer`.
114    pub fn is_dialer(&self) -> bool {
115        match self {
116            ConnectedPoint::Dialer { .. } => true,
117            ConnectedPoint::Listener { .. } => false
118        }
119    }
120
121    /// Returns true if we are `Listener`.
122    pub fn is_listener(&self) -> bool {
123        match self {
124            ConnectedPoint::Dialer { .. } => false,
125            ConnectedPoint::Listener { .. } => true
126        }
127    }
128
129    /// Returns the address of the remote stored in this struct.
130    ///
131    /// For `Dialer`, this returns `address`. For `Listener`, this returns `send_back_addr`.
132    ///
133    /// Note that the remote node might not be listening on this address and hence the address might
134    /// not be usable to establish new connections.
135    pub fn get_remote_address(&self) -> &Multiaddr {
136        match self {
137            ConnectedPoint::Dialer { address } => address,
138            ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
139        }
140    }
141
142    /// Modifies the address of the remote stored in this struct.
143    ///
144    /// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
145    pub fn set_remote_address(&mut self, new_address: Multiaddr) {
146        match self {
147            ConnectedPoint::Dialer { address } => *address = new_address,
148            ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
149        }
150    }
151}
152
153/// Information about a successfully established connection.
154#[derive(Debug, Clone, PartialEq, Eq)]
155pub struct Connected {
156    /// The connected endpoint, including network address information.
157    pub endpoint: ConnectedPoint,
158    /// Information obtained from the transport.
159    pub peer_id: PeerId,
160}
161
162/// Event generated by a [`Connection`].
163#[derive(Debug, Clone)]
164pub enum Event<T> {
165    /// Event generated by the [`ConnectionHandler`].
166    Handler(T),
167    /// Address of the remote has changed.
168    AddressChange(Multiaddr),
169}
170
171/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
172pub struct Connection<TMuxer, THandler>
173where
174    TMuxer: StreamMuxer,
175    THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
176{
177    /// Node that handles the muxing.
178    muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
179    /// Handler that processes substreams.
180    handler: THandler,
181}
182
183impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
184where
185    TMuxer: StreamMuxer,
186    THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
187{
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("Connection")
190            .field("muxing", &self.muxing)
191            .field("handler", &self.handler)
192            .finish()
193    }
194}
195
196impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
197where
198    TMuxer: StreamMuxer,
199    THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
200{
201}
202
203impl<TMuxer, THandler> Connection<TMuxer, THandler>
204where
205    TMuxer: StreamMuxer,
206    THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
207{
208    /// Builds a new `Connection` from the given substream multiplexer
209    /// and connection handler.
210    pub fn new(muxer: TMuxer, handler: THandler) -> Self {
211        Connection {
212            muxing: Muxing::new(muxer),
213            handler,
214        }
215    }
216
217    /// Returns a reference to the `ConnectionHandler`
218    pub fn handler(&self) -> &THandler {
219        &self.handler
220    }
221
222    /// Returns a mutable reference to the `ConnectionHandler`
223    pub fn handler_mut(&mut self) -> &mut THandler {
224        &mut self.handler
225    }
226
227    /// Notifies the connection handler of an event.
228    pub fn inject_event(&mut self, event: THandler::InEvent) {
229        self.handler.inject_event(event);
230    }
231
232    /// Begins an orderly shutdown of the connection, returning a
233    /// `Future` that resolves when connection shutdown is complete.
234    pub fn close(self) -> Close<TMuxer> {
235        self.muxing.close().0
236    }
237
238    /// Polls the connection for events produced by the associated handler
239    /// as a result of I/O activity on the substream multiplexer.
240    pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
241        -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
242    {
243        loop {
244            let mut io_pending = false;
245
246            // Perform I/O on the connection through the muxer, informing the handler
247            // of new substreams.
248            match self.muxing.poll(cx) {
249                Poll::Pending => io_pending = true,
250                Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => {
251                    self.handler.inject_substream(substream, SubstreamEndpoint::Listener)
252                }
253                Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { user_data, substream })) => {
254                    let endpoint = SubstreamEndpoint::Dialer(user_data);
255                    self.handler.inject_substream(substream, endpoint)
256                }
257                Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
258                    self.handler.inject_address_change(&address);
259                    return Poll::Ready(Ok(Event::AddressChange(address)));
260                }
261                Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
262            }
263
264            // Poll the handler for new events.
265            match self.handler.poll(cx) {
266                Poll::Pending => {
267                    if io_pending {
268                        return Poll::Pending // Nothing to do
269                    }
270                }
271                Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
272                    self.muxing.open_substream(user_data);
273                }
274                Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
275                    return Poll::Ready(Ok(Event::Handler(event)));
276                }
277                Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
278            }
279        }
280    }
281}
282
283/// Borrowed information about an incoming connection currently being negotiated.
284#[derive(Debug, Copy, Clone)]
285pub struct IncomingInfo<'a> {
286    /// Local connection address.
287    pub local_addr: &'a Multiaddr,
288    /// Stack of protocols used to send back data to the remote.
289    pub send_back_addr: &'a Multiaddr,
290}
291
292impl<'a> IncomingInfo<'a> {
293    /// Builds the `ConnectedPoint` corresponding to the incoming connection.
294    pub fn to_connected_point(&self) -> ConnectedPoint {
295        ConnectedPoint::Listener {
296            local_addr: self.local_addr.clone(),
297            send_back_addr: self.send_back_addr.clone(),
298        }
299    }
300}
301
302/// Borrowed information about an outgoing connection currently being negotiated.
303#[derive(Debug, Copy, Clone)]
304pub struct OutgoingInfo<'a> {
305    pub address: &'a Multiaddr,
306    pub peer_id: Option<&'a PeerId>,
307}
308
309impl<'a> OutgoingInfo<'a> {
310    /// Builds a `ConnectedPoint` corresponding to the outgoing connection.
311    pub fn to_connected_point(&self) -> ConnectedPoint {
312        ConnectedPoint::Dialer {
313            address: self.address.clone()
314        }
315    }
316}
317
318/// Information about a connection limit.
319#[derive(Debug, Clone)]
320pub struct ConnectionLimit {
321    /// The maximum number of connections.
322    pub limit: u32,
323    /// The current number of connections.
324    pub current: u32,
325}
326
327impl fmt::Display for ConnectionLimit {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        write!(f, "{}/{}", self.current, self.limit)
330    }
331}
332
333/// A `ConnectionLimit` can represent an error if it has been exceeded.
334impl Error for ConnectionLimit {}