libp2p_core/transport/
boxed.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::transport::{ListenerEvent, Transport, TransportError};
22use futures::prelude::*;
23use multiaddr::Multiaddr;
24use std::{error::Error, fmt, io, pin::Pin, sync::Arc};
25
26/// Creates a new [`Boxed`] transport from the given transport.
27pub fn boxed<T>(transport: T) -> Boxed<T::Output>
28where
29    T: Transport + Clone + Send + Sync + 'static,
30    T::Error: Send + Sync,
31    T::Dial: Send + 'static,
32    T::Listener: Send + 'static,
33    T::ListenerUpgrade: Send + 'static,
34{
35    Boxed {
36        inner: Arc::new(transport) as Arc<_>,
37    }
38}
39
40/// A `Boxed` transport is a `Transport` whose `Dial`, `Listener`
41/// and `ListenerUpgrade` futures are `Box`ed and only the `Output`
42/// and `Error` types are captured in type variables.
43pub struct Boxed<O> {
44    inner: Arc<dyn Abstract<O> + Send + Sync>,
45}
46
47type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
48type Listener<O> = Pin<Box<dyn Stream<Item = io::Result<ListenerEvent<ListenerUpgrade<O>, io::Error>>> + Send>>;
49type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
50
51trait Abstract<O> {
52    fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
53    fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
54    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
55}
56
57impl<T, O> Abstract<O> for T
58where
59    T: Transport<Output = O> + Clone + 'static,
60    T::Error: Send + Sync,
61    T::Dial: Send + 'static,
62    T::Listener: Send + 'static,
63    T::ListenerUpgrade: Send + 'static,
64{
65    fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>> {
66        let listener = Transport::listen_on(self.clone(), addr).map_err(|e| e.map(box_err))?;
67        let fut = listener.map_ok(|event|
68            event.map(|upgrade| {
69                let up = upgrade.map_err(box_err);
70                Box::pin(up) as ListenerUpgrade<O>
71            }).map_err(box_err)
72        ).map_err(box_err);
73        Ok(Box::pin(fut))
74    }
75
76    fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
77        let fut = Transport::dial(self.clone(), addr)
78            .map(|r| r.map_err(box_err))
79            .map_err(|e| e.map(box_err))?;
80        Ok(Box::pin(fut) as Dial<_>)
81    }
82
83    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
84        Transport::address_translation(self, server, observed)
85    }
86}
87
88impl<O> fmt::Debug for Boxed<O> {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "BoxedTransport")
91    }
92}
93
94impl<O> Clone for Boxed<O> {
95    fn clone(&self) -> Self {
96        Boxed {
97            inner: self.inner.clone(),
98        }
99    }
100}
101
102impl<O> Transport for Boxed<O> {
103    type Output = O;
104    type Error = io::Error;
105    type Listener = Listener<O>;
106    type ListenerUpgrade = ListenerUpgrade<O>;
107    type Dial = Dial<O>;
108
109    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
110        self.inner.listen_on(addr)
111    }
112
113    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
114        self.inner.dial(addr)
115    }
116
117    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
118        self.inner.address_translation(server, observed)
119    }
120}
121
122fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
123    io::Error::new(io::ErrorKind::Other, e)
124}