libp2prs_core/
transport.rs

1// Copyright 2017-2018 Parity Technologies (UK) Ltd.
2// Copyright 2020 Netwarps Ltd.
3//
4// Permission is hereby granted, free of charge, to any person obtaining a
5// copy of this software and associated documentation files (the "Software"),
6// to deal in the Software without restriction, including without limitation
7// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8// and/or sell copies of the Software, and to permit persons to whom the
9// Software is furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20// DEALINGS IN THE SOFTWARE.
21
22//! Connection-oriented communication channels.
23//!
24//! The main entity of this module is the [`Transport`] trait, which provides an
25//! interface for establishing connections with other nodes, thereby negotiating
26//! any desired protocols.
27
28use async_trait::async_trait;
29use futures::prelude::*;
30use std::pin::Pin;
31use std::task::{Context, Poll};
32use std::time::Duration;
33use std::{error::Error, fmt};
34
35use libp2prs_multiaddr::Multiaddr;
36
37use crate::multistream::NegotiationError;
38use libp2p_pnet::PnetError;
39
40pub mod dummy;
41pub mod memory;
42pub mod protector;
43pub mod timeout;
44pub mod upgrade;
45
46/// A transport provides connection-oriented communication between two peers
47/// through ordered streams of data (i.e. connections).
48///
49/// Connections are established either by [accepting](Transport::IListener::accept)
50/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
51/// obtains a connection by listening is often referred to as the *listener* and the
52/// peer that initiated the connection through dialing as the *dialer*, in
53/// contrast to the traditional roles of *server* and *client*.
54///
55/// Most transports also provide a form of reliable delivery on the established
56/// connections but the precise semantics of these guarantees depend on the
57/// specific transport.
58///
59/// This trait is implemented for concrete connection-oriented transport protocols
60/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
61/// functionality to the dialing or listening process (e.g. name resolution via
62/// the DNS).
63///
64
65#[async_trait]
66pub trait Transport: Send {
67    /// The result of a connection setup process, including protocol upgrades.
68    ///
69    /// Typically the output contains at least a handle to a data stream (i.e. a
70    /// connection or a substream multiplexer on top of a connection) that
71    /// provides APIs for sending and receiving data through the connection.
72    type Output;
73
74    /// Listens on the given [`Multiaddr`], producing a IListener which can be used to accept
75    /// new inbound connections.
76    ///
77    /// Returning an error when there is underlying error in transport.
78    fn listen_on(&mut self, addr: Multiaddr) -> Result<IListener<Self::Output>, TransportError>;
79
80    /// Dials the given [`Multiaddr`], returning a outbound connection.
81    ///
82    /// If [`TransportError::MultiaddrNotSupported`] is returned, it means a wrong transport is
83    /// used to dial for the address.
84    async fn dial(&mut self, addr: Multiaddr) -> Result<Self::Output, TransportError>;
85
86    /// Clones the transport and returns the trait object.
87    fn box_clone(&self) -> ITransport<Self::Output>;
88
89    /// Returns the [`Multiaddr`] protocol supported by the transport.
90    ///
91    /// In general, transport supports some concrete protocols, e.g. TCP transport for TCP.
92    /// It should always be a match between the transport and the given [`Multiaddr`] to dial/listen.
93    /// Otherwise, [`TransportError::MultiaddrNotSupported`] is returned.
94    fn protocols(&self) -> Vec<u32>;
95
96    /// Adds a timeout to the connection setup (including upgrades) for all
97    /// inbound and outbound connections established through the transport.
98    fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
99    where
100        Self: Sized,
101    {
102        timeout::TransportTimeout::new(self, timeout)
103    }
104
105    /// Adds a timeout to the connection setup (including upgrades) for all outbound
106    /// connections established through the transport.
107    fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
108    where
109        Self: Sized,
110    {
111        timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
112    }
113
114    /// Adds a timeout to the connection setup (including upgrades) for all inbound
115    /// connections established through the transport.
116    fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
117    where
118        Self: Sized,
119    {
120        timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
121    }
122}
123
124/// Event produced by [`Transport::Listener`]s.
125///
126/// Transports are expected to produce `Upgrade` events only for
127/// listen addresses which have previously been announced via
128/// a `NewAddress` event and which have not been invalidated by
129/// an `AddressExpired` event yet.
130#[derive(Clone, Debug)]
131pub enum ListenerEvent<TOutput> {
132    /// A new additional [`Multiaddr`] has been added.
133    AddressAdded(Multiaddr),
134    /// A [`Multiaddr`] is no longer existent.
135    AddressDeleted(Multiaddr),
136    /// A TOutput has been accepted.
137    Accepted(TOutput),
138}
139
140impl<TOutput> ListenerEvent<TOutput> {
141    /// In case this [`ListenerEvent`] is an Accpeted(), apply the given function
142    /// produce another listener event based the the function's result.
143    pub fn map<U>(self, f: impl FnOnce(TOutput) -> Result<U, TransportError>) -> Result<ListenerEvent<U>, TransportError> {
144        match self {
145            ListenerEvent::Accepted(o) => f(o).map(ListenerEvent::Accepted),
146            ListenerEvent::AddressAdded(a) => Ok(ListenerEvent::AddressAdded(a)),
147            ListenerEvent::AddressDeleted(a) => Ok(ListenerEvent::AddressDeleted(a)),
148        }
149    }
150
151    /// Returns `true` if this is a `AddressAdded` listener event.
152    pub fn is_address_added(&self) -> bool {
153        matches!(self, ListenerEvent::AddressAdded(_))
154    }
155
156    /// Try to turn this listener event into the `AddressAdded` part.
157    ///
158    /// Returns `None` if the event is not actually a `AddressAdded`,
159    /// otherwise the address.
160    pub fn into_new_address(self) -> Option<Multiaddr> {
161        if let ListenerEvent::AddressAdded(a) = self {
162            Some(a)
163        } else {
164            None
165        }
166    }
167
168    /// Returns `true` if this is an `AddressExpired` listener event.
169    pub fn is_address_deleted(&self) -> bool {
170        matches!(self, ListenerEvent::AddressDeleted(_))
171    }
172
173    /// Try to turn this listener event into the `AddressDeleted` part.
174    ///
175    /// Returns `None` if the event is not actually a `AddressDeleted`,
176    /// otherwise the address.
177    pub fn into_address_deleted(self) -> Option<Multiaddr> {
178        if let ListenerEvent::AddressDeleted(a) = self {
179            Some(a)
180        } else {
181            None
182        }
183    }
184}
185
186#[async_trait]
187pub trait TransportListener: Send {
188    /// The result of a connection setup process, including protocol upgrades.
189    ///
190    /// Typically the output contains at least a handle to a data stream (i.e. a
191    /// connection or a substream multiplexer on top of a connection) that
192    /// provides APIs for sending and receiving data through the connection.
193    type Output: Send;
194
195    /// The Listener handles the inbound connections
196    async fn accept(&mut self) -> Result<ListenerEvent<Self::Output>, TransportError>;
197
198    /// Returns the local addresses being listened on.
199    ///
200    /// This might be `None` if it is listening on an unspecified address. The actual
201    /// addresses will be reported by ListenerEvent::AddressAdded in this case.
202    fn multi_addr(&self) -> Option<&Multiaddr>;
203
204    fn incoming(&mut self) -> Incoming<Self>
205    where
206        Self: Sized,
207    {
208        Incoming(self)
209    }
210    // /// Returns the local network address
211    // fn local_addr(&self) -> io::Result<SocketAddr>;
212
213    /// The Listener handles the inbound connections
214    async fn accept_output(&mut self) -> Result<Self::Output, TransportError> {
215        loop {
216            if let ListenerEvent::Accepted(o) = self.accept().await? {
217                break Ok(o);
218            }
219        }
220    }
221}
222
223/// Trait object for `TransportListener`
224pub type IListener<TOutput> = Box<dyn TransportListener<Output = TOutput> + Send>;
225/// Trait object for `Transport`
226pub type ITransport<TOutput> = Box<dyn Transport<Output = TOutput> + Send>;
227
228impl<TOutput: ConnectionInfo> Clone for ITransport<TOutput> {
229    fn clone(&self) -> Self {
230        self.box_clone()
231    }
232}
233
234pub struct Incoming<'a, T>(&'a mut T);
235
236/// Implements Stream for Listener
237///
238///
239impl<'a, T> Stream for Incoming<'a, T>
240where
241    T: TransportListener,
242{
243    type Item = Result<ListenerEvent<T::Output>, TransportError>;
244
245    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246        let future = self.0.accept();
247        futures::pin_mut!(future);
248
249        let evt = futures::ready!(future.poll(cx))?;
250        Poll::Ready(Some(Ok(evt)))
251    }
252}
253
254/// The trait for the connection, which is bound by Transport::Output
255/// mark as 'Send' due to Transport::Output must be 'Send'
256///
257pub trait ConnectionInfo: Send {
258    fn local_multiaddr(&self) -> Multiaddr;
259    fn remote_multiaddr(&self) -> Multiaddr;
260}
261
262/// An error during [dialing][Transport::dial] or [accepting][Transport::IListener::accept]
263/// on a [`Transport`].
264#[derive(Debug)]
265pub enum TransportError {
266    /// The [`Multiaddr`] passed as parameter is not supported.
267    ///
268    /// Contains back the same address.
269    MultiaddrNotSupported(Multiaddr),
270
271    /// The connection can not be established in time.
272    Timeout,
273
274    /// The memory transport is unreachable.
275    Unreachable,
276
277    /// Internal error
278    Internal,
279
280    /// Routing error.
281    Routing(Box<dyn Error + Send + Sync>),
282
283    /// Any IO error that a [`Transport`] may produce.
284    IoError(std::io::Error),
285
286    /// Failed to find any IP address for this DNS address.
287    ResolveFail(String),
288
289    /// Multistream selection error.
290    NegotiationError(NegotiationError),
291
292    /// Pnet layer error.
293    ProtectorError(PnetError),
294
295    /// Security layer error.
296    SecurityError(Box<dyn Error + Send + Sync>),
297
298    /// StreamMuxer layer error
299    StreamMuxerError(Box<dyn Error + Send + Sync>),
300
301    /// websocket error
302    WsError(Box<dyn Error + Send + Sync>),
303}
304
305impl From<std::io::Error> for TransportError {
306    /// Converts IO error to TransportError
307    fn from(e: std::io::Error) -> Self {
308        TransportError::IoError(e)
309    }
310}
311
312impl From<NegotiationError> for TransportError {
313    fn from(e: NegotiationError) -> Self {
314        TransportError::NegotiationError(e)
315    }
316}
317
318impl From<PnetError> for TransportError {
319    fn from(e: PnetError) -> Self {
320        TransportError::ProtectorError(e)
321    }
322}
323
324impl fmt::Display for TransportError {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        match self {
327            TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
328            TransportError::Timeout => write!(f, "Operation timeout"),
329            TransportError::Unreachable => write!(f, "Memory transport unreachable"),
330            TransportError::Internal => write!(f, "Internal error"),
331            TransportError::Routing(err) => write!(f, "Routing layer error {:?}", err),
332            TransportError::IoError(err) => write!(f, "IO error {}", err),
333            TransportError::ResolveFail(name) => write!(f, "resolve dns {} failed", name),
334            TransportError::NegotiationError(err) => write!(f, "Negotiation error {:?}", err),
335            TransportError::ProtectorError(err) => write!(f, "Protector error {:?}", err),
336            TransportError::SecurityError(err) => write!(f, "SecurityError layer error {:?}", err),
337            TransportError::StreamMuxerError(err) => write!(f, "StreamMuxerError layer error {:?}", err),
338            TransportError::WsError(err) => write!(f, "Websocket transport  error: {}", err),
339        }
340    }
341}
342
343impl Error for TransportError {
344    fn source(&self) -> Option<&(dyn Error + 'static)> {
345        match self {
346            TransportError::MultiaddrNotSupported(_) => None,
347            TransportError::Timeout => None,
348            TransportError::Unreachable => None,
349            TransportError::Internal => None,
350            TransportError::Routing(err) => Some(&**err),
351            TransportError::IoError(err) => Some(err),
352            TransportError::ResolveFail(_) => None,
353            TransportError::NegotiationError(err) => Some(err),
354            TransportError::ProtectorError(err) => Some(err),
355            TransportError::SecurityError(err) => Some(&**err),
356            TransportError::StreamMuxerError(err) => Some(&**err),
357            TransportError::WsError(err) => Some(&**err),
358        }
359    }
360}