libp2p_core/transport/
boxed.rs1use crate::transport::{ListenerEvent, Transport, TransportError};
22use futures::prelude::*;
23use multiaddr::Multiaddr;
24use std::{error::Error, fmt, io, pin::Pin, sync::Arc};
25
26pub fn boxed<T>(transport: T) -> Boxed<T::Output>
28where
29 T: Transport + Clone + Send + Sync + 'static,
30 T::Error: Send + Sync,
31 T::Dial: Send + 'static,
32 T::Listener: Send + 'static,
33 T::ListenerUpgrade: Send + 'static,
34{
35 Boxed {
36 inner: Arc::new(transport) as Arc<_>,
37 }
38}
39
40pub struct Boxed<O> {
44 inner: Arc<dyn Abstract<O> + Send + Sync>,
45}
46
47type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
48type Listener<O> = Pin<Box<dyn Stream<Item = io::Result<ListenerEvent<ListenerUpgrade<O>, io::Error>>> + Send>>;
49type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
50
51trait Abstract<O> {
52 fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
53 fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
54 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
55}
56
57impl<T, O> Abstract<O> for T
58where
59 T: Transport<Output = O> + Clone + 'static,
60 T::Error: Send + Sync,
61 T::Dial: Send + 'static,
62 T::Listener: Send + 'static,
63 T::ListenerUpgrade: Send + 'static,
64{
65 fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>> {
66 let listener = Transport::listen_on(self.clone(), addr).map_err(|e| e.map(box_err))?;
67 let fut = listener.map_ok(|event|
68 event.map(|upgrade| {
69 let up = upgrade.map_err(box_err);
70 Box::pin(up) as ListenerUpgrade<O>
71 }).map_err(box_err)
72 ).map_err(box_err);
73 Ok(Box::pin(fut))
74 }
75
76 fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
77 let fut = Transport::dial(self.clone(), addr)
78 .map(|r| r.map_err(box_err))
79 .map_err(|e| e.map(box_err))?;
80 Ok(Box::pin(fut) as Dial<_>)
81 }
82
83 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
84 Transport::address_translation(self, server, observed)
85 }
86}
87
88impl<O> fmt::Debug for Boxed<O> {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 write!(f, "BoxedTransport")
91 }
92}
93
94impl<O> Clone for Boxed<O> {
95 fn clone(&self) -> Self {
96 Boxed {
97 inner: self.inner.clone(),
98 }
99 }
100}
101
102impl<O> Transport for Boxed<O> {
103 type Output = O;
104 type Error = io::Error;
105 type Listener = Listener<O>;
106 type ListenerUpgrade = ListenerUpgrade<O>;
107 type Dial = Dial<O>;
108
109 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
110 self.inner.listen_on(addr)
111 }
112
113 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
114 self.inner.dial(addr)
115 }
116
117 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
118 self.inner.address_translation(server, observed)
119 }
120}
121
122fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
123 io::Error::new(io::ErrorKind::Other, e)
124}