cs_mwc_libp2p_core/
transport.rs

1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Connection-oriented communication channels.
22//!
23//! The main entity of this module is the [`Transport`] trait, which provides an
24//! interface for establishing connections with other nodes, thereby negotiating
25//! any desired protocols. The rest of the module defines combinators for
26//! modifying a transport through composition with other transports or protocol upgrades.
27
28use crate::ConnectedPoint;
29use futures::prelude::*;
30use multiaddr::Multiaddr;
31use std::{error::Error, fmt};
32
33pub mod and_then;
34pub mod choice;
35pub mod dummy;
36pub mod map;
37pub mod map_err;
38pub mod memory;
39pub mod timeout;
40pub mod upgrade;
41
42mod boxed;
43mod optional;
44
45pub use self::boxed::Boxed;
46pub use self::choice::OrTransport;
47pub use self::memory::MemoryTransport;
48pub use self::optional::OptionalTransport;
49pub use self::upgrade::Upgrade;
50
51/// A transport provides connection-oriented communication between two peers
52/// through ordered streams of data (i.e. connections).
53///
54/// Connections are established either by [listening](Transport::listen_on)
55/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
56/// obtains a connection by listening is often referred to as the *listener* and the
57/// peer that initiated the connection through dialing as the *dialer*, in
58/// contrast to the traditional roles of *server* and *client*.
59///
60/// Most transports also provide a form of reliable delivery on the established
61/// connections but the precise semantics of these guarantees depend on the
62/// specific transport.
63///
64/// This trait is implemented for concrete connection-oriented transport protocols
65/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
66/// functionality to the dialing or listening process (e.g. name resolution via
67/// the DNS).
68///
69/// Additional protocols can be layered on top of the connections established
70/// by a [`Transport`] through an upgrade mechanism that is initiated via
71/// [`upgrade`](Transport::upgrade).
72///
73/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
74/// >           words, listening or dialing consumes the transport object. This has been designed
75/// >           so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly
76/// >           on `Foo`.
77pub trait Transport {
78    /// The result of a connection setup process, including protocol upgrades.
79    ///
80    /// Typically the output contains at least a handle to a data stream (i.e. a
81    /// connection or a substream multiplexer on top of a connection) that
82    /// provides APIs for sending and receiving data through the connection.
83    type Output;
84
85    /// An error that occurred during connection setup.
86    type Error: Error;
87
88    /// A stream of [`Output`](Transport::Output)s for inbound connections.
89    ///
90    /// An item should be produced whenever a connection is received at the lowest level of the
91    /// transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade) future
92    /// that resolves to an [`Output`](Transport::Output) value once all protocol upgrades
93    /// have been applied.
94    ///
95    /// If this stream produces an error, it is considered fatal and the listener is killed. It
96    /// is possible to report non-fatal errors by producing a [`ListenerEvent::Error`].
97    type Listener: Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
98
99    /// A pending [`Output`](Transport::Output) for an inbound connection,
100    /// obtained from the [`Listener`](Transport::Listener) stream.
101    ///
102    /// After a connection has been accepted by the transport, it may need to go through
103    /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
104    /// post-processing should not block the `Listener` from producing the next
105    /// connection, hence further connection setup proceeds asynchronously.
106    /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
107    /// of the connection setup process.
108    type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;
109
110    /// A pending [`Output`](Transport::Output) for an outbound connection,
111    /// obtained from [dialing](Transport::dial).
112    type Dial: Future<Output = Result<Self::Output, Self::Error>>;
113
114    /// Listens on the given [`Multiaddr`], producing a stream of pending, inbound connections
115    /// and addresses this transport is listening on (cf. [`ListenerEvent`]).
116    ///
117    /// Returning an error from the stream is considered fatal. The listener can also report
118    /// non-fatal errors by producing a [`ListenerEvent::Error`].
119    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>>
120    where
121        Self: Sized;
122
123    /// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
124    ///
125    /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
126    /// try an alternative [`Transport`], if available.
127    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
128    where
129        Self: Sized;
130
131    /// Performs a transport-specific mapping of an address `observed` by
132    /// a remote onto a local `listen` address to yield an address for
133    /// the local node that may be reachable for other peers.
134    fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
135
136    /// Boxes the transport, including custom transport errors.
137    fn boxed(self) -> boxed::Boxed<Self::Output>
138    where
139        Self: Transport + Sized + Clone + Send + Sync + 'static,
140        Self::Dial: Send + 'static,
141        Self::Listener: Send + 'static,
142        Self::ListenerUpgrade: Send + 'static,
143        Self::Error: Send + Sync,
144    {
145        boxed::boxed(self)
146    }
147
148    /// Applies a function on the connections created by the transport.
149    fn map<F, O>(self, f: F) -> map::Map<Self, F>
150    where
151        Self: Sized,
152        F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
153    {
154        map::Map::new(self, f)
155    }
156
157    /// Applies a function on the errors generated by the futures of the transport.
158    fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
159    where
160        Self: Sized,
161        F: FnOnce(Self::Error) -> E + Clone
162    {
163        map_err::MapErr::new(self, f)
164    }
165
166    /// Adds a fallback transport that is used when encountering errors
167    /// while establishing inbound or outbound connections.
168    ///
169    /// The returned transport will act like `self`, except that if `listen_on` or `dial`
170    /// return an error then `other` will be tried.
171    fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
172    where
173        Self: Sized,
174        U: Transport,
175        <U as Transport>::Error: 'static
176    {
177        OrTransport::new(self, other)
178    }
179
180    /// Applies a function producing an asynchronous result to every connection
181    /// created by this transport.
182    ///
183    /// This function can be used for ad-hoc protocol upgrades or
184    /// for processing or adapting the output for following configurations.
185    ///
186    /// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
187    fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
188    where
189        Self: Sized,
190        C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
191        F: TryFuture<Ok = O>,
192        <F as TryFuture>::Error: Error + 'static
193    {
194        and_then::AndThen::new(self, f)
195    }
196
197    /// Begins a series of protocol upgrades via an
198    /// [`upgrade::Builder`](upgrade::Builder).
199    fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
200    where
201        Self: Sized,
202        Self::Error: 'static
203    {
204        upgrade::Builder::new(self, version)
205    }
206}
207
208/// Event produced by [`Transport::Listener`]s.
209///
210/// Transports are expected to produce `Upgrade` events only for
211/// listen addresses which have previously been announced via
212/// a `NewAddress` event and which have not been invalidated by
213/// an `AddressExpired` event yet.
214#[derive(Clone, Debug, PartialEq)]
215pub enum ListenerEvent<TUpgr, TErr> {
216    /// The transport is listening on a new additional [`Multiaddr`].
217    NewAddress(Multiaddr),
218    /// An upgrade, consisting of the upgrade future, the listener address and the remote address.
219    Upgrade {
220        /// The upgrade.
221        upgrade: TUpgr,
222        /// The local address which produced this upgrade.
223        local_addr: Multiaddr,
224        /// The remote address which produced this upgrade.
225        remote_addr: Multiaddr
226    },
227    /// A [`Multiaddr`] is no longer used for listening.
228    AddressExpired(Multiaddr),
229    /// A non-fatal error has happened on the listener.
230    ///
231    /// This event should be generated in order to notify the user that something wrong has
232    /// happened. The listener, however, continues to run.
233    Error(TErr),
234}
235
236impl<TUpgr, TErr> ListenerEvent<TUpgr, TErr> {
237    /// In case this [`ListenerEvent`] is an upgrade, apply the given function
238    /// to the upgrade and multiaddress and produce another listener event
239    /// based the the function's result.
240    pub fn map<U>(self, f: impl FnOnce(TUpgr) -> U) -> ListenerEvent<U, TErr> {
241        match self {
242            ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
243                ListenerEvent::Upgrade { upgrade: f(upgrade), local_addr, remote_addr }
244            }
245            ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
246            ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
247            ListenerEvent::Error(e) => ListenerEvent::Error(e),
248        }
249    }
250
251    /// In case this [`ListenerEvent`] is an [`Error`](ListenerEvent::Error),
252    /// apply the given function to the error and produce another listener event based on the
253    /// function's result.
254    pub fn map_err<U>(self, f: impl FnOnce(TErr) -> U) -> ListenerEvent<TUpgr, U> {
255        match self {
256            ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } =>
257                ListenerEvent::Upgrade { upgrade, local_addr, remote_addr },
258            ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
259            ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
260            ListenerEvent::Error(e) => ListenerEvent::Error(f(e)),
261        }
262    }
263
264    /// Returns `true` if this is an `Upgrade` listener event.
265    pub fn is_upgrade(&self) -> bool {
266        matches!(self, ListenerEvent::Upgrade {..})
267    }
268
269    /// Try to turn this listener event into upgrade parts.
270    ///
271    /// Returns `None` if the event is not actually an upgrade,
272    /// otherwise the upgrade and the remote address.
273    pub fn into_upgrade(self) -> Option<(TUpgr, Multiaddr)> {
274        if let ListenerEvent::Upgrade { upgrade, remote_addr, .. } = self {
275            Some((upgrade, remote_addr))
276        } else {
277            None
278        }
279    }
280
281    /// Returns `true` if this is a `NewAddress` listener event.
282    pub fn is_new_address(&self) -> bool {
283        matches!(self, ListenerEvent::NewAddress(_))
284    }
285
286    /// Try to turn this listener event into the `NewAddress` part.
287    ///
288    /// Returns `None` if the event is not actually a `NewAddress`,
289    /// otherwise the address.
290    pub fn into_new_address(self) -> Option<Multiaddr> {
291        if let ListenerEvent::NewAddress(a) = self {
292            Some(a)
293        } else {
294            None
295        }
296    }
297
298    /// Returns `true` if this is an `AddressExpired` listener event.
299    pub fn is_address_expired(&self) -> bool {
300        matches!(self, ListenerEvent::AddressExpired(_))
301    }
302
303    /// Try to turn this listener event into the `AddressExpired` part.
304    ///
305    /// Returns `None` if the event is not actually a `AddressExpired`,
306    /// otherwise the address.
307    pub fn into_address_expired(self) -> Option<Multiaddr> {
308        if let ListenerEvent::AddressExpired(a) = self {
309            Some(a)
310        } else {
311            None
312        }
313    }
314
315    /// Returns `true` if this is an `Error` listener event.
316    pub fn is_error(&self) -> bool {
317        matches!(self, ListenerEvent::Error(_))
318    }
319
320    /// Try to turn this listener event into the `Error` part.
321    ///
322    /// Returns `None` if the event is not actually a `Error`,
323    /// otherwise the error.
324    pub fn into_error(self) -> Option<TErr> {
325        if let ListenerEvent::Error(err) = self {
326            Some(err)
327        } else {
328            None
329        }
330    }
331}
332
333/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
334/// on a [`Transport`].
335#[derive(Debug, Clone)]
336pub enum TransportError<TErr> {
337    /// The [`Multiaddr`] passed as parameter is not supported.
338    ///
339    /// Contains back the same address.
340    MultiaddrNotSupported(Multiaddr),
341
342    /// Any other error that a [`Transport`] may produce.
343    Other(TErr),
344}
345
346impl<TErr> TransportError<TErr> {
347    /// Applies a function to the the error in [`TransportError::Other`].
348    pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
349        match self {
350            TransportError::MultiaddrNotSupported(addr) => TransportError::MultiaddrNotSupported(addr),
351            TransportError::Other(err) => TransportError::Other(map(err)),
352        }
353    }
354}
355
356impl<TErr> fmt::Display for TransportError<TErr>
357where TErr: fmt::Display,
358{
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        match self {
361            TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
362            TransportError::Other(err) => write!(f, "{}", err),
363        }
364    }
365}
366
367impl<TErr> Error for TransportError<TErr>
368where TErr: Error + 'static,
369{
370    fn source(&self) -> Option<&(dyn Error + 'static)> {
371        match self {
372            TransportError::MultiaddrNotSupported(_) => None,
373            TransportError::Other(err) => Some(err),
374        }
375    }
376}