libp2p_core/transport/
map_err.rs1use crate::transport::{Transport, TransportError, ListenerEvent};
22use futures::prelude::*;
23use multiaddr::Multiaddr;
24use std::{error, pin::Pin, task::Context, task::Poll};
25
26#[derive(Debug, Copy, Clone)]
28pub struct MapErr<T, F> {
29 transport: T,
30 map: F,
31}
32
33impl<T, F> MapErr<T, F> {
34 pub(crate) fn new(transport: T, map: F) -> MapErr<T, F> {
36 MapErr { transport, map }
37 }
38}
39
40impl<T, F, TErr> Transport for MapErr<T, F>
41where
42 T: Transport,
43 F: FnOnce(T::Error) -> TErr + Clone,
44 TErr: error::Error,
45{
46 type Output = T::Output;
47 type Error = TErr;
48 type Listener = MapErrListener<T, F>;
49 type ListenerUpgrade = MapErrListenerUpgrade<T, F>;
50 type Dial = MapErrDial<T, F>;
51
52 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
53 let map = self.map;
54 match self.transport.listen_on(addr) {
55 Ok(stream) => Ok(MapErrListener { inner: stream, map }),
56 Err(err) => Err(err.map(map))
57 }
58 }
59
60 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
61 let map = self.map;
62 match self.transport.dial(addr) {
63 Ok(future) => Ok(MapErrDial { inner: future, map: Some(map) }),
64 Err(err) => Err(err.map(map)),
65 }
66 }
67
68 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
69 self.transport.address_translation(server, observed)
70 }
71}
72
73#[pin_project::pin_project]
75pub struct MapErrListener<T: Transport, F> {
76 #[pin]
77 inner: T::Listener,
78 map: F,
79}
80
81impl<T, F, TErr> Stream for MapErrListener<T, F>
82where
83 T: Transport,
84 F: FnOnce(T::Error) -> TErr + Clone,
85 TErr: error::Error,
86{
87 type Item = Result<ListenerEvent<MapErrListenerUpgrade<T, F>, TErr>, TErr>;
88
89 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 let this = self.project();
91 match TryStream::try_poll_next(this.inner, cx) {
92 Poll::Ready(Some(Ok(event))) => {
93 let map = &*this.map;
94 let event = event
95 .map(move |value| {
96 MapErrListenerUpgrade {
97 inner: value,
98 map: Some(map.clone())
99 }
100 })
101 .map_err(|err| (map.clone())(err));
102 Poll::Ready(Some(Ok(event)))
103 }
104 Poll::Ready(None) => Poll::Ready(None),
105 Poll::Pending => Poll::Pending,
106 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((this.map.clone())(err)))),
107 }
108 }
109}
110
111#[pin_project::pin_project]
113pub struct MapErrListenerUpgrade<T: Transport, F> {
114 #[pin]
115 inner: T::ListenerUpgrade,
116 map: Option<F>,
117}
118
119impl<T, F, TErr> Future for MapErrListenerUpgrade<T, F>
120where T: Transport,
121 F: FnOnce(T::Error) -> TErr,
122{
123 type Output = Result<T::Output, TErr>;
124
125 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
126 let this = self.project();
127 match Future::poll(this.inner, cx) {
128 Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)),
129 Poll::Pending => Poll::Pending,
130 Poll::Ready(Err(err)) => {
131 let map = this.map.take().expect("poll() called again after error");
132 Poll::Ready(Err(map(err)))
133 }
134 }
135 }
136}
137
138#[pin_project::pin_project]
140pub struct MapErrDial<T: Transport, F> {
141 #[pin]
142 inner: T::Dial,
143 map: Option<F>,
144}
145
146impl<T, F, TErr> Future for MapErrDial<T, F>
147where
148 T: Transport,
149 F: FnOnce(T::Error) -> TErr,
150{
151 type Output = Result<T::Output, TErr>;
152
153 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154 let this = self.project();
155 match Future::poll(this.inner, cx) {
156 Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)),
157 Poll::Pending => Poll::Pending,
158 Poll::Ready(Err(err)) => {
159 let map = this.map.take().expect("poll() called again after error");
160 Poll::Ready(Err(map(err)))
161 }
162 }
163 }
164}