cs_mwc_libp2p_core/transport.rs
1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use crate::ConnectedPoint;
29use futures::prelude::*;
30use multiaddr::Multiaddr;
31use std::{error::Error, fmt};
32
33pub mod and_then;
34pub mod choice;
35pub mod dummy;
36pub mod map;
37pub mod map_err;
38pub mod memory;
39pub mod timeout;
40pub mod upgrade;
41
42mod boxed;
43mod optional;
44
45pub use self::boxed::Boxed;
46pub use self::choice::OrTransport;
47pub use self::memory::MemoryTransport;
48pub use self::optional::OptionalTransport;
49pub use self::upgrade::Upgrade;
50
51/// A transport provides connection-oriented communication between two peers
52/// through ordered streams of data (i.e. connections).
53///
54/// Connections are established either by [listening](Transport::listen_on)
55/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
56/// obtains a connection by listening is often referred to as the *listener* and the
57/// peer that initiated the connection through dialing as the *dialer*, in
58/// contrast to the traditional roles of *server* and *client*.
59///
60/// Most transports also provide a form of reliable delivery on the established
61/// connections but the precise semantics of these guarantees depend on the
62/// specific transport.
63///
64/// This trait is implemented for concrete connection-oriented transport protocols
65/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
66/// functionality to the dialing or listening process (e.g. name resolution via
67/// the DNS).
68///
69/// Additional protocols can be layered on top of the connections established
70/// by a [`Transport`] through an upgrade mechanism that is initiated via
71/// [`upgrade`](Transport::upgrade).
72///
73/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
74/// > words, listening or dialing consumes the transport object. This has been designed
75/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly
76/// > on `Foo`.
77pub trait Transport {
78 /// The result of a connection setup process, including protocol upgrades.
79 ///
80 /// Typically the output contains at least a handle to a data stream (i.e. a
81 /// connection or a substream multiplexer on top of a connection) that
82 /// provides APIs for sending and receiving data through the connection.
83 type Output;
84
85 /// An error that occurred during connection setup.
86 type Error: Error;
87
88 /// A stream of [`Output`](Transport::Output)s for inbound connections.
89 ///
90 /// An item should be produced whenever a connection is received at the lowest level of the
91 /// transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade) future
92 /// that resolves to an [`Output`](Transport::Output) value once all protocol upgrades
93 /// have been applied.
94 ///
95 /// If this stream produces an error, it is considered fatal and the listener is killed. It
96 /// is possible to report non-fatal errors by producing a [`ListenerEvent::Error`].
97 type Listener: Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
98
99 /// A pending [`Output`](Transport::Output) for an inbound connection,
100 /// obtained from the [`Listener`](Transport::Listener) stream.
101 ///
102 /// After a connection has been accepted by the transport, it may need to go through
103 /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
104 /// post-processing should not block the `Listener` from producing the next
105 /// connection, hence further connection setup proceeds asynchronously.
106 /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
107 /// of the connection setup process.
108 type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
109
110 /// A pending [`Output`](Transport::Output) for an outbound connection,
111 /// obtained from [dialing](Transport::dial).
112 type Dial: Future<Output = Result<Self::Output, Self::Error>>;
113
114 /// Listens on the given [`Multiaddr`], producing a stream of pending, inbound connections
115 /// and addresses this transport is listening on (cf. [`ListenerEvent`]).
116 ///
117 /// Returning an error from the stream is considered fatal. The listener can also report
118 /// non-fatal errors by producing a [`ListenerEvent::Error`].
119 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>>
120 where
121 Self: Sized;
122
123 /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
124 ///
125 /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
126 /// try an alternative [`Transport`], if available.
127 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
128 where
129 Self: Sized;
130
131 /// Performs a transport-specific mapping of an address `observed` by
132 /// a remote onto a local `listen` address to yield an address for
133 /// the local node that may be reachable for other peers.
134 fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
135
136 /// Boxes the transport, including custom transport errors.
137 fn boxed(self) -> boxed::Boxed<Self::Output>
138 where
139 Self: Transport + Sized + Clone + Send + Sync + 'static,
140 Self::Dial: Send + 'static,
141 Self::Listener: Send + 'static,
142 Self::ListenerUpgrade: Send + 'static,
143 Self::Error: Send + Sync,
144 {
145 boxed::boxed(self)
146 }
147
148 /// Applies a function on the connections created by the transport.
149 fn map<F, O>(self, f: F) -> map::Map<Self, F>
150 where
151 Self: Sized,
152 F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
153 {
154 map::Map::new(self, f)
155 }
156
157 /// Applies a function on the errors generated by the futures of the transport.
158 fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
159 where
160 Self: Sized,
161 F: FnOnce(Self::Error) -> E + Clone
162 {
163 map_err::MapErr::new(self, f)
164 }
165
166 /// Adds a fallback transport that is used when encountering errors
167 /// while establishing inbound or outbound connections.
168 ///
169 /// The returned transport will act like `self`, except that if `listen_on` or `dial`
170 /// return an error then `other` will be tried.
171 fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
172 where
173 Self: Sized,
174 U: Transport,
175 <U as Transport>::Error: 'static
176 {
177 OrTransport::new(self, other)
178 }
179
180 /// Applies a function producing an asynchronous result to every connection
181 /// created by this transport.
182 ///
183 /// This function can be used for ad-hoc protocol upgrades or
184 /// for processing or adapting the output for following configurations.
185 ///
186 /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
187 fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
188 where
189 Self: Sized,
190 C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
191 F: TryFuture<Ok = O>,
192 <F as TryFuture>::Error: Error + 'static
193 {
194 and_then::AndThen::new(self, f)
195 }
196
197 /// Begins a series of protocol upgrades via an
198 /// [`upgrade::Builder`](upgrade::Builder).
199 fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
200 where
201 Self: Sized,
202 Self::Error: 'static
203 {
204 upgrade::Builder::new(self, version)
205 }
206}
207
208/// Event produced by [`Transport::Listener`]s.
209///
210/// Transports are expected to produce `Upgrade` events only for
211/// listen addresses which have previously been announced via
212/// a `NewAddress` event and which have not been invalidated by
213/// an `AddressExpired` event yet.
214#[derive(Clone, Debug, PartialEq)]
215pub enum ListenerEvent<TUpgr, TErr> {
216 /// The transport is listening on a new additional [`Multiaddr`].
217 NewAddress(Multiaddr),
218 /// An upgrade, consisting of the upgrade future, the listener address and the remote address.
219 Upgrade {
220 /// The upgrade.
221 upgrade: TUpgr,
222 /// The local address which produced this upgrade.
223 local_addr: Multiaddr,
224 /// The remote address which produced this upgrade.
225 remote_addr: Multiaddr
226 },
227 /// A [`Multiaddr`] is no longer used for listening.
228 AddressExpired(Multiaddr),
229 /// A non-fatal error has happened on the listener.
230 ///
231 /// This event should be generated in order to notify the user that something wrong has
232 /// happened. The listener, however, continues to run.
233 Error(TErr),
234}
235
236impl<TUpgr, TErr> ListenerEvent<TUpgr, TErr> {
237 /// In case this [`ListenerEvent`] is an upgrade, apply the given function
238 /// to the upgrade and multiaddress and produce another listener event
239 /// based the the function's result.
240 pub fn map<U>(self, f: impl FnOnce(TUpgr) -> U) -> ListenerEvent<U, TErr> {
241 match self {
242 ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
243 ListenerEvent::Upgrade { upgrade: f(upgrade), local_addr, remote_addr }
244 }
245 ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
246 ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
247 ListenerEvent::Error(e) => ListenerEvent::Error(e),
248 }
249 }
250
251 /// In case this [`ListenerEvent`] is an [`Error`](ListenerEvent::Error),
252 /// apply the given function to the error and produce another listener event based on the
253 /// function's result.
254 pub fn map_err<U>(self, f: impl FnOnce(TErr) -> U) -> ListenerEvent<TUpgr, U> {
255 match self {
256 ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } =>
257 ListenerEvent::Upgrade { upgrade, local_addr, remote_addr },
258 ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
259 ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
260 ListenerEvent::Error(e) => ListenerEvent::Error(f(e)),
261 }
262 }
263
264 /// Returns `true` if this is an `Upgrade` listener event.
265 pub fn is_upgrade(&self) -> bool {
266 matches!(self, ListenerEvent::Upgrade {..})
267 }
268
269 /// Try to turn this listener event into upgrade parts.
270 ///
271 /// Returns `None` if the event is not actually an upgrade,
272 /// otherwise the upgrade and the remote address.
273 pub fn into_upgrade(self) -> Option<(TUpgr, Multiaddr)> {
274 if let ListenerEvent::Upgrade { upgrade, remote_addr, .. } = self {
275 Some((upgrade, remote_addr))
276 } else {
277 None
278 }
279 }
280
281 /// Returns `true` if this is a `NewAddress` listener event.
282 pub fn is_new_address(&self) -> bool {
283 matches!(self, ListenerEvent::NewAddress(_))
284 }
285
286 /// Try to turn this listener event into the `NewAddress` part.
287 ///
288 /// Returns `None` if the event is not actually a `NewAddress`,
289 /// otherwise the address.
290 pub fn into_new_address(self) -> Option<Multiaddr> {
291 if let ListenerEvent::NewAddress(a) = self {
292 Some(a)
293 } else {
294 None
295 }
296 }
297
298 /// Returns `true` if this is an `AddressExpired` listener event.
299 pub fn is_address_expired(&self) -> bool {
300 matches!(self, ListenerEvent::AddressExpired(_))
301 }
302
303 /// Try to turn this listener event into the `AddressExpired` part.
304 ///
305 /// Returns `None` if the event is not actually a `AddressExpired`,
306 /// otherwise the address.
307 pub fn into_address_expired(self) -> Option<Multiaddr> {
308 if let ListenerEvent::AddressExpired(a) = self {
309 Some(a)
310 } else {
311 None
312 }
313 }
314
315 /// Returns `true` if this is an `Error` listener event.
316 pub fn is_error(&self) -> bool {
317 matches!(self, ListenerEvent::Error(_))
318 }
319
320 /// Try to turn this listener event into the `Error` part.
321 ///
322 /// Returns `None` if the event is not actually a `Error`,
323 /// otherwise the error.
324 pub fn into_error(self) -> Option<TErr> {
325 if let ListenerEvent::Error(err) = self {
326 Some(err)
327 } else {
328 None
329 }
330 }
331}
332
333/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
334/// on a [`Transport`].
335#[derive(Debug, Clone)]
336pub enum TransportError<TErr> {
337 /// The [`Multiaddr`] passed as parameter is not supported.
338 ///
339 /// Contains back the same address.
340 MultiaddrNotSupported(Multiaddr),
341
342 /// Any other error that a [`Transport`] may produce.
343 Other(TErr),
344}
345
346impl<TErr> TransportError<TErr> {
347 /// Applies a function to the the error in [`TransportError::Other`].
348 pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
349 match self {
350 TransportError::MultiaddrNotSupported(addr) => TransportError::MultiaddrNotSupported(addr),
351 TransportError::Other(err) => TransportError::Other(map(err)),
352 }
353 }
354}
355
356impl<TErr> fmt::Display for TransportError<TErr>
357where TErr: fmt::Display,
358{
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 match self {
361 TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
362 TransportError::Other(err) => write!(f, "{}", err),
363 }
364 }
365}
366
367impl<TErr> Error for TransportError<TErr>
368where TErr: Error + 'static,
369{
370 fn source(&self) -> Option<&(dyn Error + 'static)> {
371 match self {
372 TransportError::MultiaddrNotSupported(_) => None,
373 TransportError::Other(err) => Some(err),
374 }
375 }
376}