mwc_libp2p_core/transport/
upgrade.rs

1// Copyright 2017-2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Configuration of transport protocol upgrades.
22
23pub use crate::upgrade::Version;
24
25use crate::{
26    ConnectedPoint,
27    Negotiated,
28    transport::{
29        Transport,
30        TransportError,
31        ListenerEvent,
32        and_then::AndThen,
33        boxed::boxed,
34        timeout::TransportTimeout,
35    },
36    muxing::{StreamMuxer, StreamMuxerBox},
37    upgrade::{
38        self,
39        OutboundUpgrade,
40        InboundUpgrade,
41        apply_inbound,
42        apply_outbound,
43        UpgradeError,
44        OutboundUpgradeApply,
45        InboundUpgradeApply
46    },
47    PeerId
48};
49use futures::{prelude::*, ready};
50use multiaddr::Multiaddr;
51use std::{
52    error::Error,
53    fmt,
54    pin::Pin,
55    task::{Context, Poll},
56    time::Duration
57};
58
59/// A `Builder` facilitates upgrading of a [`Transport`] for use with
60/// a [`Network`].
61///
62/// The upgrade process is defined by the following stages:
63///
64///    [`authenticate`](Builder::authenticate)`{1}`
65/// -> [`apply`](Authenticated::apply)`{*}`
66/// -> [`multiplex`](Authenticated::multiplex)`{1}`
67///
68/// It thus enforces the following invariants on every transport
69/// obtained from [`multiplex`](Authenticated::multiplex):
70///
71///   1. The transport must be [authenticated](Builder::authenticate)
72///      and [multiplexed](Authenticated::multiplex).
73///   2. Authentication must precede the negotiation of a multiplexer.
74///   3. Applying a multiplexer is the last step in the upgrade process.
75///   4. The [`Transport::Output`] conforms to the requirements of a [`Network`],
76///      namely a tuple of a [`PeerId`] (from the authentication upgrade) and a
77///      [`StreamMuxer`] (from the multiplexing upgrade).
78///
79/// [`Network`]: crate::Network
80#[derive(Clone)]
81pub struct Builder<T> {
82    inner: T,
83    version: upgrade::Version,
84}
85
86impl<T> Builder<T>
87where
88    T: Transport,
89    T::Error: 'static,
90{
91    /// Creates a `Builder` over the given (base) `Transport`.
92    pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
93        Builder { inner, version }
94    }
95
96    /// Upgrades the transport to perform authentication of the remote.
97    ///
98    /// The supplied upgrade receives the I/O resource `C` and must
99    /// produce a pair `(PeerId, D)`, where `D` is a new I/O resource.
100    /// The upgrade must thus at a minimum identify the remote, which typically
101    /// involves the use of a cryptographic authentication protocol in the
102    /// context of establishing a secure channel.
103    ///
104    /// ## Transitions
105    ///
106    ///   * I/O upgrade: `C -> (PeerId, D)`.
107    ///   * Transport output: `C -> (PeerId, D)`
108    pub fn authenticate<C, D, U, E>(self, upgrade: U) -> Authenticated<
109        AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
110    > where
111        T: Transport<Output = C>,
112        C: AsyncRead + AsyncWrite + Unpin,
113        D: AsyncRead + AsyncWrite + Unpin,
114        U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
115        U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
116        E: Error + 'static,
117    {
118        let version = self.version;
119        Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| {
120            Authenticate {
121                inner: upgrade::apply(conn, upgrade, endpoint, version)
122            }
123        }), version))
124    }
125}
126
127/// An upgrade that authenticates the remote peer, typically
128/// in the context of negotiating a secure channel.
129///
130/// Configured through [`Builder::authenticate`].
131#[pin_project::pin_project]
132pub struct Authenticate<C, U>
133where
134    C: AsyncRead + AsyncWrite + Unpin,
135    U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
136{
137    #[pin]
138    inner: EitherUpgrade<C, U>
139}
140
141impl<C, U> Future for Authenticate<C, U>
142where
143    C: AsyncRead + AsyncWrite + Unpin,
144    U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>,
145        Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
146        Error = <U as InboundUpgrade<Negotiated<C>>>::Error
147    >
148{
149    type Output = <EitherUpgrade<C, U> as Future>::Output;
150
151    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152        let this = self.project();
153        Future::poll(this.inner, cx)
154    }
155}
156
157/// An upgrade that negotiates a (sub)stream multiplexer on
158/// top of an authenticated transport.
159///
160/// Configured through [`Authenticated::multiplex`].
161#[pin_project::pin_project]
162pub struct Multiplex<C, U>
163where
164    C: AsyncRead + AsyncWrite + Unpin,
165    U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
166{
167    peer_id: Option<PeerId>,
168    #[pin]
169    upgrade: EitherUpgrade<C, U>,
170}
171
172impl<C, U, M, E> Future for Multiplex<C, U>
173where
174    C: AsyncRead + AsyncWrite + Unpin,
175    U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
176    U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
177{
178    type Output = Result<(PeerId, M), UpgradeError<E>>;
179
180    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181        let this = self.project();
182        let m = match ready!(Future::poll(this.upgrade, cx)) {
183            Ok(m) => m,
184            Err(err) => return Poll::Ready(Err(err)),
185        };
186        let i = this.peer_id.take().expect("Multiplex future polled after completion.");
187        Poll::Ready(Ok((i, m)))
188    }
189}
190
191/// An transport with peer authentication, obtained from [`Builder::authenticate`].
192#[derive(Clone)]
193pub struct Authenticated<T>(Builder<T>);
194
195impl<T> Authenticated<T>
196where
197    T: Transport,
198    T::Error: 'static
199{
200    /// Applies an arbitrary upgrade.
201    ///
202    /// The upgrade receives the I/O resource (i.e. connection) `C` and
203    /// must produce a new I/O resource `D`. Any number of such upgrades
204    /// can be performed.
205    ///
206    /// ## Transitions
207    ///
208    ///   * I/O upgrade: `C -> D`.
209    ///   * Transport output: `(PeerId, C) -> (PeerId, D)`.
210    pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
211    where
212        T: Transport<Output = (PeerId, C)>,
213        C: AsyncRead + AsyncWrite + Unpin,
214        D: AsyncRead + AsyncWrite + Unpin,
215        U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
216        U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
217        E: Error + 'static,
218    {
219        Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version))
220    }
221
222    /// Upgrades the transport with a (sub)stream multiplexer.
223    ///
224    /// The supplied upgrade receives the I/O resource `C` and must
225    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
226    /// This ends the (regular) transport upgrade process.
227    ///
228    /// ## Transitions
229    ///
230    ///   * I/O upgrade: `C -> M`.
231    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
232    pub fn multiplex<C, M, U, E>(self, upgrade: U) -> Multiplexed<
233        AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
234    > where
235        T: Transport<Output = (PeerId, C)>,
236        C: AsyncRead + AsyncWrite + Unpin,
237        M: StreamMuxer,
238        U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
239        U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
240        E: Error + 'static,
241    {
242        let version = self.0.version;
243        Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
244            let upgrade = upgrade::apply(c, upgrade, endpoint, version);
245            Multiplex { peer_id: Some(i), upgrade }
246        }))
247    }
248
249    /// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
250    ///
251    /// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`]
252    /// and returns an upgrade which receives the I/O resource `C` and must
253    /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
254    /// This ends the (regular) transport upgrade process.
255    ///
256    /// ## Transitions
257    ///
258    ///   * I/O upgrade: `C -> M`.
259    ///   * Transport output: `(PeerId, C) -> (PeerId, M)`.
260    pub fn multiplex_ext<C, M, U, E, F>(self, up: F) -> Multiplexed<
261        AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
262    > where
263        T: Transport<Output = (PeerId, C)>,
264        C: AsyncRead + AsyncWrite + Unpin,
265        M: StreamMuxer,
266        U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
267        U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
268        E: Error + 'static,
269        F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone
270    {
271        let version = self.0.version;
272        Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
273            let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
274            Multiplex { peer_id: Some(peer_id), upgrade }
275        }))
276    }
277}
278
279/// A authenticated and multiplexed transport, obtained from
280/// [`Authenticated::multiplex`].
281#[derive(Clone)]
282pub struct Multiplexed<T>(T);
283
284impl<T> Multiplexed<T> {
285    /// Boxes the authenticated, multiplexed transport, including
286    /// the [`StreamMuxer`] and custom transport errors.
287    pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
288    where
289        T: Transport<Output = (PeerId, M)> + Sized + Clone + Send + Sync + 'static,
290        T::Dial: Send + 'static,
291        T::Listener: Send + 'static,
292        T::ListenerUpgrade: Send + 'static,
293        T::Error: Send + Sync,
294        M: StreamMuxer + Send + Sync + 'static,
295        M::Substream: Send + 'static,
296        M::OutboundSubstream: Send + 'static
297    {
298        boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
299    }
300
301    /// Adds a timeout to the setup and protocol upgrade process for all
302    /// inbound and outbound connections established through the transport.
303    pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
304        Multiplexed(TransportTimeout::new(self.0, timeout))
305    }
306
307    /// Adds a timeout to the setup and protocol upgrade process for all
308    /// outbound connections established through the transport.
309    pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
310        Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
311    }
312
313    /// Adds a timeout to the setup and protocol upgrade process for all
314    /// inbound connections established through the transport.
315    pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
316        Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
317    }
318}
319
320impl<T> Transport for Multiplexed<T>
321where
322    T: Transport,
323{
324    type Output = T::Output;
325    type Error = T::Error;
326    type Listener = T::Listener;
327    type ListenerUpgrade = T::ListenerUpgrade;
328    type Dial = T::Dial;
329
330    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
331        self.0.dial(addr)
332    }
333
334    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
335        self.0.listen_on(addr)
336    }
337
338    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
339        self.0.address_translation(server, observed)
340    }
341}
342
343/// An inbound or outbound upgrade.
344type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
345
346/// A custom upgrade on an [`Authenticated`] transport.
347///
348/// See [`Transport::upgrade`]
349#[derive(Debug, Copy, Clone)]
350pub struct Upgrade<T, U> { inner: T, upgrade: U }
351
352impl<T, U> Upgrade<T, U> {
353    pub fn new(inner: T, upgrade: U) -> Self {
354        Upgrade { inner, upgrade }
355    }
356}
357
358impl<T, C, D, U, E> Transport for Upgrade<T, U>
359where
360    T: Transport<Output = (PeerId, C)>,
361    T::Error: 'static,
362    C: AsyncRead + AsyncWrite + Unpin,
363    U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
364    U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
365    E: Error + 'static
366{
367    type Output = (PeerId, D);
368    type Error = TransportUpgradeError<T::Error, E>;
369    type Listener = ListenerStream<T::Listener, U>;
370    type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
371    type Dial = DialUpgradeFuture<T::Dial, U, C>;
372
373    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
374        let future = self.inner.dial(addr)
375            .map_err(|err| err.map(TransportUpgradeError::Transport))?;
376        Ok(DialUpgradeFuture {
377            future: Box::pin(future),
378            upgrade: future::Either::Left(Some(self.upgrade))
379        })
380    }
381
382    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
383        let stream = self.inner.listen_on(addr)
384            .map_err(|err| err.map(TransportUpgradeError::Transport))?;
385        Ok(ListenerStream {
386            stream: Box::pin(stream),
387            upgrade: self.upgrade
388        })
389    }
390
391    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
392        self.inner.address_translation(server, observed)
393    }
394}
395
396/// Errors produced by a transport upgrade.
397#[derive(Debug)]
398pub enum TransportUpgradeError<T, U> {
399    /// Error in the transport.
400    Transport(T),
401    /// Error while upgrading to a protocol.
402    Upgrade(UpgradeError<U>),
403}
404
405impl<T, U> fmt::Display for TransportUpgradeError<T, U>
406where
407    T: fmt::Display,
408    U: fmt::Display,
409{
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        match self {
412            TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e),
413            TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e),
414        }
415    }
416}
417
418impl<T, U> Error for TransportUpgradeError<T, U>
419where
420    T: Error + 'static,
421    U: Error + 'static,
422{
423    fn source(&self) -> Option<&(dyn Error + 'static)> {
424        match self {
425            TransportUpgradeError::Transport(e) => Some(e),
426            TransportUpgradeError::Upgrade(e) => Some(e),
427        }
428    }
429}
430
431/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
432pub struct DialUpgradeFuture<F, U, C>
433where
434    U: OutboundUpgrade<Negotiated<C>>,
435    C: AsyncRead + AsyncWrite + Unpin,
436{
437    future: Pin<Box<F>>,
438    upgrade: future::Either<Option<U>, (Option<PeerId>, OutboundUpgradeApply<C, U>)>
439}
440
441impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
442where
443    F: TryFuture<Ok = (PeerId, C)>,
444    C: AsyncRead + AsyncWrite + Unpin,
445    U: OutboundUpgrade<Negotiated<C>, Output = D>,
446    U::Error: Error
447{
448    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
449
450    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451        // We use a `this` variable because the compiler can't mutably borrow multiple times
452        // accross a `Deref`.
453        let this = &mut *self;
454
455        loop {
456            this.upgrade = match this.upgrade {
457                future::Either::Left(ref mut up) => {
458                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
459                        Ok(v) => v,
460                        Err(err) => return Poll::Ready(Err(err)),
461                    };
462                    let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some).");
463                    future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
464                }
465                future::Either::Right((ref mut i, ref mut up)) => {
466                    let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
467                        Ok(d) => d,
468                        Err(err) => return Poll::Ready(Err(err)),
469                    };
470                    let i = i.take().expect("DialUpgradeFuture polled after completion.");
471                    return Poll::Ready(Ok((i, d)))
472                }
473            }
474        }
475    }
476}
477
478impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
479where
480    U: OutboundUpgrade<Negotiated<C>>,
481    C: AsyncRead + AsyncWrite + Unpin,
482{
483}
484
485/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport.
486pub struct ListenerStream<S, U> {
487    stream: Pin<Box<S>>,
488    upgrade: U
489}
490
491impl<S, U, F, C, D, E> Stream for ListenerStream<S, U>
492where
493    S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
494    F: TryFuture<Ok = (PeerId, C)>,
495    C: AsyncRead + AsyncWrite + Unpin,
496    U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
497{
498    type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
499
500    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501        match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
502            Some(Ok(event)) => {
503                let event = event
504                    .map(move |future| {
505                        ListenerUpgradeFuture {
506                            future: Box::pin(future),
507                            upgrade: future::Either::Left(Some(self.upgrade.clone()))
508                        }
509                    })
510                    .map_err(TransportUpgradeError::Transport);
511                Poll::Ready(Some(Ok(event)))
512            }
513            Some(Err(err)) => {
514                Poll::Ready(Some(Err(TransportUpgradeError::Transport(err))))
515            }
516            None => Poll::Ready(None)
517        }
518    }
519}
520
521impl<S, U> Unpin for ListenerStream<S, U> {
522}
523
524/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
525pub struct ListenerUpgradeFuture<F, U, C>
526where
527    C: AsyncRead + AsyncWrite + Unpin,
528    U: InboundUpgrade<Negotiated<C>>
529{
530    future: Pin<Box<F>>,
531    upgrade: future::Either<Option<U>, (Option<PeerId>, InboundUpgradeApply<C, U>)>
532}
533
534impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
535where
536    F: TryFuture<Ok = (PeerId, C)>,
537    C: AsyncRead + AsyncWrite + Unpin,
538    U: InboundUpgrade<Negotiated<C>, Output = D>,
539    U::Error: Error
540{
541    type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
542
543    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544        // We use a `this` variable because the compiler can't mutably borrow multiple times
545        // accross a `Deref`.
546        let this = &mut *self;
547
548        loop {
549            this.upgrade = match this.upgrade {
550                future::Either::Left(ref mut up) => {
551                    let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
552                        Ok(v) => v,
553                        Err(err) => return Poll::Ready(Err(err))
554                    };
555                    let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
556                    future::Either::Right((Some(i), apply_inbound(c, u)))
557                }
558                future::Either::Right((ref mut i, ref mut up)) => {
559                    let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
560                        Ok(v) => v,
561                        Err(err) => return Poll::Ready(Err(err))
562                    };
563                    let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
564                    return Poll::Ready(Ok((i, d)))
565                }
566            }
567        }
568    }
569}
570
571impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
572where
573    C: AsyncRead + AsyncWrite + Unpin,
574    U: InboundUpgrade<Negotiated<C>>
575{
576}