1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
// Copyright 2020 Netwarps Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Connection-oriented communication channels.
//!
//! The main entity of this module is the [`Transport`] trait, which provides an
//! interface for establishing connections with other nodes, thereby negotiating
//! any desired protocols.

use async_trait::async_trait;
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{error::Error, fmt};

use libp2prs_multiaddr::Multiaddr;

use crate::multistream::NegotiationError;
use crate::pnet::PnetError;

pub mod dummy;
pub mod memory;
pub mod protector;
pub mod timeout;
pub mod upgrade;

/// A transport provides connection-oriented communication between two peers
/// through ordered streams of data (i.e. connections).
///
/// Connections are established either by [accepting](Transport::IListener::accept)
/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
/// obtains a connection by listening is often referred to as the *listener* and the
/// peer that initiated the connection through dialing as the *dialer*, in
/// contrast to the traditional roles of *server* and *client*.
///
/// Most transports also provide a form of reliable delivery on the established
/// connections but the precise semantics of these guarantees depend on the
/// specific transport.
///
/// This trait is implemented for concrete connection-oriented transport protocols
/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
/// functionality to the dialing or listening process (e.g. name resolution via
/// the DNS).
///

#[async_trait]
pub trait Transport: Send {
    /// The result of a connection setup process, including protocol upgrades.
    ///
    /// Typically the output contains at least a handle to a data stream (i.e. a
    /// connection or a substream multiplexer on top of a connection) that
    /// provides APIs for sending and receiving data through the connection.
    type Output;

    /// Listens on the given [`Multiaddr`], producing a IListener which can be used to accept
    /// new inbound connections.
    ///
    /// Returning an error when there is underlying error in transport.
    fn listen_on(&mut self, addr: Multiaddr) -> Result<IListener<Self::Output>, TransportError>;

    /// Dials the given [`Multiaddr`], returning a outbound connection.
    ///
    /// If [`TransportError::MultiaddrNotSupported`] is returned, it means a wrong transport is
    /// used to dial for the address.
    async fn dial(&mut self, addr: Multiaddr) -> Result<Self::Output, TransportError>;

    /// Clones the transport and returns the trait object.
    fn box_clone(&self) -> ITransport<Self::Output>;

    /// Returns the [`Multiaddr`] protocol supported by the transport.
    ///
    /// In general, transport supports some concrete protocols, e.g. TCP transport for TCP.
    /// It should always be a match between the transport and the given [`Multiaddr`] to dial/listen.
    /// Otherwise, [`TransportError::MultiaddrNotSupported`] is returned.
    fn protocols(&self) -> Vec<u32>;

    /// Adds a timeout to the connection setup (including upgrades) for all
    /// inbound and outbound connections established through the transport.
    fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
    where
        Self: Sized,
    {
        timeout::TransportTimeout::new(self, timeout)
    }

    /// Adds a timeout to the connection setup (including upgrades) for all outbound
    /// connections established through the transport.
    fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
    where
        Self: Sized,
    {
        timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
    }

    /// Adds a timeout to the connection setup (including upgrades) for all inbound
    /// connections established through the transport.
    fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
    where
        Self: Sized,
    {
        timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
    }
}

/// Event produced by [`Transport::Listener`]s.
///
/// Transports are expected to produce `Upgrade` events only for
/// listen addresses which have previously been announced via
/// a `NewAddress` event and which have not been invalidated by
/// an `AddressExpired` event yet.
#[derive(Clone, Debug)]
pub enum ListenerEvent<TOutput> {
    /// A new additional [`Multiaddr`] has been added.
    AddressAdded(Multiaddr),
    /// A [`Multiaddr`] is no longer existent.
    AddressDeleted(Multiaddr),
    /// A TOutput has been accepted.
    Accepted(TOutput),
}

impl<TOutput> ListenerEvent<TOutput> {
    /// In case this [`ListenerEvent`] is an Accpeted(), apply the given function
    /// produce another listener event based the the function's result.
    pub fn map<U>(self, f: impl FnOnce(TOutput) -> Result<U, TransportError>) -> Result<ListenerEvent<U>, TransportError> {
        match self {
            ListenerEvent::Accepted(o) => f(o).map(ListenerEvent::Accepted),
            ListenerEvent::AddressAdded(a) => Ok(ListenerEvent::AddressAdded(a)),
            ListenerEvent::AddressDeleted(a) => Ok(ListenerEvent::AddressDeleted(a)),
        }
    }

    /// Returns `true` if this is a `AddressAdded` listener event.
    pub fn is_address_added(&self) -> bool {
        matches!(self, ListenerEvent::AddressAdded(_))
    }

    /// Try to turn this listener event into the `AddressAdded` part.
    ///
    /// Returns `None` if the event is not actually a `AddressAdded`,
    /// otherwise the address.
    pub fn into_new_address(self) -> Option<Multiaddr> {
        if let ListenerEvent::AddressAdded(a) = self {
            Some(a)
        } else {
            None
        }
    }

    /// Returns `true` if this is an `AddressExpired` listener event.
    pub fn is_address_deleted(&self) -> bool {
        matches!(self, ListenerEvent::AddressDeleted(_))
    }

    /// Try to turn this listener event into the `AddressDeleted` part.
    ///
    /// Returns `None` if the event is not actually a `AddressDeleted`,
    /// otherwise the address.
    pub fn into_address_deleted(self) -> Option<Multiaddr> {
        if let ListenerEvent::AddressDeleted(a) = self {
            Some(a)
        } else {
            None
        }
    }
}

#[async_trait]
pub trait TransportListener: Send {
    /// The result of a connection setup process, including protocol upgrades.
    ///
    /// Typically the output contains at least a handle to a data stream (i.e. a
    /// connection or a substream multiplexer on top of a connection) that
    /// provides APIs for sending and receiving data through the connection.
    type Output: Send;

    /// The Listener handles the inbound connections
    async fn accept(&mut self) -> Result<ListenerEvent<Self::Output>, TransportError>;

    /// Returns the local addresses being listened on.
    ///
    /// This might be `None` if it is listening on an unspecified address. The actual
    /// addresses will be reported by ListenerEvent::AddressAdded in this case.
    fn multi_addr(&self) -> Option<&Multiaddr>;

    fn incoming(&mut self) -> Incoming<Self>
    where
        Self: Sized,
    {
        Incoming(self)
    }
    // /// Returns the local network address
    // fn local_addr(&self) -> io::Result<SocketAddr>;

    /// The Listener handles the inbound connections
    async fn accept_output(&mut self) -> Result<Self::Output, TransportError> {
        loop {
            if let ListenerEvent::Accepted(o) = self.accept().await? {
                break Ok(o);
            }
        }
    }
}

/// Trait object for `TransportListener`
pub type IListener<TOutput> = Box<dyn TransportListener<Output = TOutput> + Send>;
/// Trait object for `Transport`
pub type ITransport<TOutput> = Box<dyn Transport<Output = TOutput> + Send>;

impl<TOutput: ConnectionInfo> Clone for ITransport<TOutput> {
    fn clone(&self) -> Self {
        self.box_clone()
    }
}

pub struct Incoming<'a, T>(&'a mut T);

/// Implements Stream for Listener
///
///
impl<'a, T> Stream for Incoming<'a, T>
where
    T: TransportListener,
{
    type Item = Result<ListenerEvent<T::Output>, TransportError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let future = self.0.accept();
        futures::pin_mut!(future);

        let evt = futures::ready!(future.poll(cx))?;
        Poll::Ready(Some(Ok(evt)))
    }
}

/// The trait for the connection, which is bound by Transport::Output
/// mark as 'Send' due to Transport::Output must be 'Send'
///
pub trait ConnectionInfo: Send {
    fn local_multiaddr(&self) -> Multiaddr;
    fn remote_multiaddr(&self) -> Multiaddr;
}

/// An error during [dialing][Transport::dial] or [accepting][Transport::IListener::accept]
/// on a [`Transport`].
#[derive(Debug)]
pub enum TransportError {
    /// The [`Multiaddr`] passed as parameter is not supported.
    ///
    /// Contains back the same address.
    MultiaddrNotSupported(Multiaddr),

    /// The connection can not be established in time.
    Timeout,

    /// The memory transport is unreachable.
    Unreachable,

    /// Routing to the peer is not available.
    Routing(String),

    /// Internal error
    Internal,

    /// Any IO error that a [`Transport`] may produce.
    IoError(std::io::Error),

    /// Failed to find any IP address for this DNS address.
    ResolveFail(String),

    /// Multistream selection error.
    NegotiationError(NegotiationError),

    /// Pnet layer error.
    ProtectorError(PnetError),

    /// Security layer error.
    SecurityError(Box<dyn Error + Send + Sync>),

    /// StreamMuxer layer error
    StreamMuxerError(Box<dyn Error + Send + Sync>),

    /// websocket error
    WsError(Box<dyn Error + Send + Sync>),
}

impl From<std::io::Error> for TransportError {
    /// Converts IO error to TransportError
    fn from(e: std::io::Error) -> Self {
        TransportError::IoError(e)
    }
}

impl From<NegotiationError> for TransportError {
    fn from(e: NegotiationError) -> Self {
        TransportError::NegotiationError(e)
    }
}

impl From<PnetError> for TransportError {
    fn from(e: PnetError) -> Self {
        TransportError::ProtectorError(e)
    }
}

impl fmt::Display for TransportError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
            TransportError::Timeout => write!(f, "Operation timeout"),
            TransportError::Routing(s) => write!(f, "Routing not available {}", s),
            TransportError::Unreachable => write!(f, "Memory transport unreachable"),
            TransportError::Internal => write!(f, "Internal error"),
            TransportError::IoError(err) => write!(f, "IO error {}", err),
            TransportError::ResolveFail(name) => write!(f, "resolve dns {} failed", name),
            TransportError::NegotiationError(err) => write!(f, "Negotiation error {:?}", err),
            TransportError::ProtectorError(err) => write!(f, "Protector error {:?}", err),
            TransportError::SecurityError(err) => write!(f, "SecurityError layer error {:?}", err),
            TransportError::StreamMuxerError(err) => write!(f, "StreamMuxerError layer error {:?}", err),
            TransportError::WsError(err) => write!(f, "Websocket transport  error: {}", err),
        }
    }
}

impl Error for TransportError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            TransportError::MultiaddrNotSupported(_) => None,
            TransportError::Timeout => None,
            TransportError::Unreachable => None,
            TransportError::Routing(_) => None,
            TransportError::Internal => None,
            TransportError::IoError(err) => Some(err),
            TransportError::ResolveFail(_) => None,
            TransportError::NegotiationError(err) => Some(err),
            TransportError::ProtectorError(err) => Some(err),
            TransportError::SecurityError(err) => Some(&**err),
            TransportError::StreamMuxerError(err) => Some(&**err),
            TransportError::WsError(err) => Some(&**err),
        }
    }
}