libp2prs_core/transport.rs
1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2// Copyright 2020 Netwarps Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22//! Connection-oriented communication channels.
23//!
24//! The main entity of this module is the [`Transport`] trait, which provides an
25//! interface for establishing connections with other nodes, thereby negotiating
26//! any desired protocols.
27
28use async_trait::async_trait;
29use futures::prelude::*;
30use std::pin::Pin;
31use std::task::{Context, Poll};
32use std::time::Duration;
33use std::{error::Error, fmt};
34
35use libp2prs_multiaddr::Multiaddr;
36
37use crate::multistream::NegotiationError;
38use libp2p_pnet::PnetError;
39
40pub mod dummy;
41pub mod memory;
42pub mod protector;
43pub mod timeout;
44pub mod upgrade;
45
46/// A transport provides connection-oriented communication between two peers
47/// through ordered streams of data (i.e. connections).
48///
49/// Connections are established either by [accepting](Transport::IListener::accept)
50/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
51/// obtains a connection by listening is often referred to as the *listener* and the
52/// peer that initiated the connection through dialing as the *dialer*, in
53/// contrast to the traditional roles of *server* and *client*.
54///
55/// Most transports also provide a form of reliable delivery on the established
56/// connections but the precise semantics of these guarantees depend on the
57/// specific transport.
58///
59/// This trait is implemented for concrete connection-oriented transport protocols
60/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
61/// functionality to the dialing or listening process (e.g. name resolution via
62/// the DNS).
63///
64
65#[async_trait]
66pub trait Transport: Send {
67 /// The result of a connection setup process, including protocol upgrades.
68 ///
69 /// Typically the output contains at least a handle to a data stream (i.e. a
70 /// connection or a substream multiplexer on top of a connection) that
71 /// provides APIs for sending and receiving data through the connection.
72 type Output;
73
74 /// Listens on the given [`Multiaddr`], producing a IListener which can be used to accept
75 /// new inbound connections.
76 ///
77 /// Returning an error when there is underlying error in transport.
78 fn listen_on(&mut self, addr: Multiaddr) -> Result<IListener<Self::Output>, TransportError>;
79
80 /// Dials the given [`Multiaddr`], returning a outbound connection.
81 ///
82 /// If [`TransportError::MultiaddrNotSupported`] is returned, it means a wrong transport is
83 /// used to dial for the address.
84 async fn dial(&mut self, addr: Multiaddr) -> Result<Self::Output, TransportError>;
85
86 /// Clones the transport and returns the trait object.
87 fn box_clone(&self) -> ITransport<Self::Output>;
88
89 /// Returns the [`Multiaddr`] protocol supported by the transport.
90 ///
91 /// In general, transport supports some concrete protocols, e.g. TCP transport for TCP.
92 /// It should always be a match between the transport and the given [`Multiaddr`] to dial/listen.
93 /// Otherwise, [`TransportError::MultiaddrNotSupported`] is returned.
94 fn protocols(&self) -> Vec<u32>;
95
96 /// Adds a timeout to the connection setup (including upgrades) for all
97 /// inbound and outbound connections established through the transport.
98 fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
99 where
100 Self: Sized,
101 {
102 timeout::TransportTimeout::new(self, timeout)
103 }
104
105 /// Adds a timeout to the connection setup (including upgrades) for all outbound
106 /// connections established through the transport.
107 fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
108 where
109 Self: Sized,
110 {
111 timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
112 }
113
114 /// Adds a timeout to the connection setup (including upgrades) for all inbound
115 /// connections established through the transport.
116 fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
117 where
118 Self: Sized,
119 {
120 timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
121 }
122}
123
124/// Event produced by [`Transport::Listener`]s.
125///
126/// Transports are expected to produce `Upgrade` events only for
127/// listen addresses which have previously been announced via
128/// a `NewAddress` event and which have not been invalidated by
129/// an `AddressExpired` event yet.
130#[derive(Clone, Debug)]
131pub enum ListenerEvent<TOutput> {
132 /// A new additional [`Multiaddr`] has been added.
133 AddressAdded(Multiaddr),
134 /// A [`Multiaddr`] is no longer existent.
135 AddressDeleted(Multiaddr),
136 /// A TOutput has been accepted.
137 Accepted(TOutput),
138}
139
140impl<TOutput> ListenerEvent<TOutput> {
141 /// In case this [`ListenerEvent`] is an Accpeted(), apply the given function
142 /// produce another listener event based the the function's result.
143 pub fn map<U>(self, f: impl FnOnce(TOutput) -> Result<U, TransportError>) -> Result<ListenerEvent<U>, TransportError> {
144 match self {
145 ListenerEvent::Accepted(o) => f(o).map(ListenerEvent::Accepted),
146 ListenerEvent::AddressAdded(a) => Ok(ListenerEvent::AddressAdded(a)),
147 ListenerEvent::AddressDeleted(a) => Ok(ListenerEvent::AddressDeleted(a)),
148 }
149 }
150
151 /// Returns `true` if this is a `AddressAdded` listener event.
152 pub fn is_address_added(&self) -> bool {
153 matches!(self, ListenerEvent::AddressAdded(_))
154 }
155
156 /// Try to turn this listener event into the `AddressAdded` part.
157 ///
158 /// Returns `None` if the event is not actually a `AddressAdded`,
159 /// otherwise the address.
160 pub fn into_new_address(self) -> Option<Multiaddr> {
161 if let ListenerEvent::AddressAdded(a) = self {
162 Some(a)
163 } else {
164 None
165 }
166 }
167
168 /// Returns `true` if this is an `AddressExpired` listener event.
169 pub fn is_address_deleted(&self) -> bool {
170 matches!(self, ListenerEvent::AddressDeleted(_))
171 }
172
173 /// Try to turn this listener event into the `AddressDeleted` part.
174 ///
175 /// Returns `None` if the event is not actually a `AddressDeleted`,
176 /// otherwise the address.
177 pub fn into_address_deleted(self) -> Option<Multiaddr> {
178 if let ListenerEvent::AddressDeleted(a) = self {
179 Some(a)
180 } else {
181 None
182 }
183 }
184}
185
186#[async_trait]
187pub trait TransportListener: Send {
188 /// The result of a connection setup process, including protocol upgrades.
189 ///
190 /// Typically the output contains at least a handle to a data stream (i.e. a
191 /// connection or a substream multiplexer on top of a connection) that
192 /// provides APIs for sending and receiving data through the connection.
193 type Output: Send;
194
195 /// The Listener handles the inbound connections
196 async fn accept(&mut self) -> Result<ListenerEvent<Self::Output>, TransportError>;
197
198 /// Returns the local addresses being listened on.
199 ///
200 /// This might be `None` if it is listening on an unspecified address. The actual
201 /// addresses will be reported by ListenerEvent::AddressAdded in this case.
202 fn multi_addr(&self) -> Option<&Multiaddr>;
203
204 fn incoming(&mut self) -> Incoming<Self>
205 where
206 Self: Sized,
207 {
208 Incoming(self)
209 }
210 // /// Returns the local network address
211 // fn local_addr(&self) -> io::Result<SocketAddr>;
212
213 /// The Listener handles the inbound connections
214 async fn accept_output(&mut self) -> Result<Self::Output, TransportError> {
215 loop {
216 if let ListenerEvent::Accepted(o) = self.accept().await? {
217 break Ok(o);
218 }
219 }
220 }
221}
222
223/// Trait object for `TransportListener`
224pub type IListener<TOutput> = Box<dyn TransportListener<Output = TOutput> + Send>;
225/// Trait object for `Transport`
226pub type ITransport<TOutput> = Box<dyn Transport<Output = TOutput> + Send>;
227
228impl<TOutput: ConnectionInfo> Clone for ITransport<TOutput> {
229 fn clone(&self) -> Self {
230 self.box_clone()
231 }
232}
233
234pub struct Incoming<'a, T>(&'a mut T);
235
236/// Implements Stream for Listener
237///
238///
239impl<'a, T> Stream for Incoming<'a, T>
240where
241 T: TransportListener,
242{
243 type Item = Result<ListenerEvent<T::Output>, TransportError>;
244
245 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246 let future = self.0.accept();
247 futures::pin_mut!(future);
248
249 let evt = futures::ready!(future.poll(cx))?;
250 Poll::Ready(Some(Ok(evt)))
251 }
252}
253
254/// The trait for the connection, which is bound by Transport::Output
255/// mark as 'Send' due to Transport::Output must be 'Send'
256///
257pub trait ConnectionInfo: Send {
258 fn local_multiaddr(&self) -> Multiaddr;
259 fn remote_multiaddr(&self) -> Multiaddr;
260}
261
262/// An error during [dialing][Transport::dial] or [accepting][Transport::IListener::accept]
263/// on a [`Transport`].
264#[derive(Debug)]
265pub enum TransportError {
266 /// The [`Multiaddr`] passed as parameter is not supported.
267 ///
268 /// Contains back the same address.
269 MultiaddrNotSupported(Multiaddr),
270
271 /// The connection can not be established in time.
272 Timeout,
273
274 /// The memory transport is unreachable.
275 Unreachable,
276
277 /// Internal error
278 Internal,
279
280 /// Routing error.
281 Routing(Box<dyn Error + Send + Sync>),
282
283 /// Any IO error that a [`Transport`] may produce.
284 IoError(std::io::Error),
285
286 /// Failed to find any IP address for this DNS address.
287 ResolveFail(String),
288
289 /// Multistream selection error.
290 NegotiationError(NegotiationError),
291
292 /// Pnet layer error.
293 ProtectorError(PnetError),
294
295 /// Security layer error.
296 SecurityError(Box<dyn Error + Send + Sync>),
297
298 /// StreamMuxer layer error
299 StreamMuxerError(Box<dyn Error + Send + Sync>),
300
301 /// websocket error
302 WsError(Box<dyn Error + Send + Sync>),
303}
304
305impl From<std::io::Error> for TransportError {
306 /// Converts IO error to TransportError
307 fn from(e: std::io::Error) -> Self {
308 TransportError::IoError(e)
309 }
310}
311
312impl From<NegotiationError> for TransportError {
313 fn from(e: NegotiationError) -> Self {
314 TransportError::NegotiationError(e)
315 }
316}
317
318impl From<PnetError> for TransportError {
319 fn from(e: PnetError) -> Self {
320 TransportError::ProtectorError(e)
321 }
322}
323
324impl fmt::Display for TransportError {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 match self {
327 TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
328 TransportError::Timeout => write!(f, "Operation timeout"),
329 TransportError::Unreachable => write!(f, "Memory transport unreachable"),
330 TransportError::Internal => write!(f, "Internal error"),
331 TransportError::Routing(err) => write!(f, "Routing layer error {:?}", err),
332 TransportError::IoError(err) => write!(f, "IO error {}", err),
333 TransportError::ResolveFail(name) => write!(f, "resolve dns {} failed", name),
334 TransportError::NegotiationError(err) => write!(f, "Negotiation error {:?}", err),
335 TransportError::ProtectorError(err) => write!(f, "Protector error {:?}", err),
336 TransportError::SecurityError(err) => write!(f, "SecurityError layer error {:?}", err),
337 TransportError::StreamMuxerError(err) => write!(f, "StreamMuxerError layer error {:?}", err),
338 TransportError::WsError(err) => write!(f, "Websocket transport error: {}", err),
339 }
340 }
341}
342
343impl Error for TransportError {
344 fn source(&self) -> Option<&(dyn Error + 'static)> {
345 match self {
346 TransportError::MultiaddrNotSupported(_) => None,
347 TransportError::Timeout => None,
348 TransportError::Unreachable => None,
349 TransportError::Internal => None,
350 TransportError::Routing(err) => Some(&**err),
351 TransportError::IoError(err) => Some(err),
352 TransportError::ResolveFail(_) => None,
353 TransportError::NegotiationError(err) => Some(err),
354 TransportError::ProtectorError(err) => Some(err),
355 TransportError::SecurityError(err) => Some(&**err),
356 TransportError::StreamMuxerError(err) => Some(&**err),
357 TransportError::WsError(err) => Some(&**err),
358 }
359 }
360}