mwc_libp2p_core/upgrade/
map.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
22use futures::prelude::*;
23use std::{pin::Pin, task::Context, task::Poll};
24
25/// Wraps around an upgrade and applies a closure to the output.
26#[derive(Debug, Clone)]
27pub struct MapInboundUpgrade<U, F> { upgrade: U, fun: F }
28
29impl<U, F> MapInboundUpgrade<U, F> {
30    pub fn new(upgrade: U, fun: F) -> Self {
31        MapInboundUpgrade { upgrade, fun }
32    }
33}
34
35impl<U, F> UpgradeInfo for MapInboundUpgrade<U, F>
36where
37    U: UpgradeInfo
38{
39    type Info = U::Info;
40    type InfoIter = U::InfoIter;
41
42    fn protocol_info(&self) -> Self::InfoIter {
43        self.upgrade.protocol_info()
44    }
45}
46
47impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgrade<U, F>
48where
49    U: InboundUpgrade<C>,
50    F: FnOnce(U::Output) -> T
51{
52    type Output = T;
53    type Error = U::Error;
54    type Future = MapFuture<U::Future, F>;
55
56    fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
57        MapFuture {
58            inner: self.upgrade.upgrade_inbound(sock, info),
59            map: Some(self.fun)
60        }
61    }
62}
63
64impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgrade<U, F>
65where
66    U: OutboundUpgrade<C>,
67{
68    type Output = U::Output;
69    type Error = U::Error;
70    type Future = U::Future;
71
72    fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
73        self.upgrade.upgrade_outbound(sock, info)
74    }
75}
76
77/// Wraps around an upgrade and applies a closure to the output.
78#[derive(Debug, Clone)]
79pub struct MapOutboundUpgrade<U, F> { upgrade: U, fun: F }
80
81impl<U, F> MapOutboundUpgrade<U, F> {
82    pub fn new(upgrade: U, fun: F) -> Self {
83        MapOutboundUpgrade { upgrade, fun }
84    }
85}
86
87impl<U, F> UpgradeInfo for MapOutboundUpgrade<U, F>
88where
89    U: UpgradeInfo
90{
91    type Info = U::Info;
92    type InfoIter = U::InfoIter;
93
94    fn protocol_info(&self) -> Self::InfoIter {
95        self.upgrade.protocol_info()
96    }
97}
98
99impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgrade<U, F>
100where
101    U: InboundUpgrade<C>,
102{
103    type Output = U::Output;
104    type Error = U::Error;
105    type Future = U::Future;
106
107    fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
108        self.upgrade.upgrade_inbound(sock, info)
109    }
110}
111
112impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgrade<U, F>
113where
114    U: OutboundUpgrade<C>,
115    F: FnOnce(U::Output) -> T
116{
117    type Output = T;
118    type Error = U::Error;
119    type Future = MapFuture<U::Future, F>;
120
121    fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
122        MapFuture {
123            inner: self.upgrade.upgrade_outbound(sock, info),
124            map: Some(self.fun)
125        }
126    }
127}
128
129/// Wraps around an upgrade and applies a closure to the error.
130#[derive(Debug, Clone)]
131pub struct MapInboundUpgradeErr<U, F> { upgrade: U, fun: F }
132
133impl<U, F> MapInboundUpgradeErr<U, F> {
134    pub fn new(upgrade: U, fun: F) -> Self {
135        MapInboundUpgradeErr { upgrade, fun }
136    }
137}
138
139impl<U, F> UpgradeInfo for MapInboundUpgradeErr<U, F>
140where
141    U: UpgradeInfo
142{
143    type Info = U::Info;
144    type InfoIter = U::InfoIter;
145
146    fn protocol_info(&self) -> Self::InfoIter {
147        self.upgrade.protocol_info()
148    }
149}
150
151impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgradeErr<U, F>
152where
153    U: InboundUpgrade<C>,
154    F: FnOnce(U::Error) -> T
155{
156    type Output = U::Output;
157    type Error = T;
158    type Future = MapErrFuture<U::Future, F>;
159
160    fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
161        MapErrFuture {
162            fut: self.upgrade.upgrade_inbound(sock, info),
163            fun: Some(self.fun)
164        }
165    }
166}
167
168impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgradeErr<U, F>
169where
170    U: OutboundUpgrade<C>,
171{
172    type Output = U::Output;
173    type Error = U::Error;
174    type Future = U::Future;
175
176    fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
177        self.upgrade.upgrade_outbound(sock, info)
178    }
179}
180
181/// Wraps around an upgrade and applies a closure to the error.
182#[derive(Debug, Clone)]
183pub struct MapOutboundUpgradeErr<U, F> { upgrade: U, fun: F }
184
185impl<U, F> MapOutboundUpgradeErr<U, F> {
186    pub fn new(upgrade: U, fun: F) -> Self {
187        MapOutboundUpgradeErr { upgrade, fun }
188    }
189}
190
191impl<U, F> UpgradeInfo for MapOutboundUpgradeErr<U, F>
192where
193    U: UpgradeInfo
194{
195    type Info = U::Info;
196    type InfoIter = U::InfoIter;
197
198    fn protocol_info(&self) -> Self::InfoIter {
199        self.upgrade.protocol_info()
200    }
201}
202
203impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgradeErr<U, F>
204where
205    U: OutboundUpgrade<C>,
206    F: FnOnce(U::Error) -> T
207{
208    type Output = U::Output;
209    type Error = T;
210    type Future = MapErrFuture<U::Future, F>;
211
212    fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
213        MapErrFuture {
214            fut: self.upgrade.upgrade_outbound(sock, info),
215            fun: Some(self.fun)
216        }
217    }
218}
219
220impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgradeErr<U, F>
221where
222    U: InboundUpgrade<C>
223{
224    type Output = U::Output;
225    type Error = U::Error;
226    type Future = U::Future;
227
228    fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
229        self.upgrade.upgrade_inbound(sock, info)
230    }
231}
232
233#[pin_project::pin_project]
234pub struct MapFuture<TInnerFut, TMap> {
235    #[pin]
236    inner: TInnerFut,
237    map: Option<TMap>,
238}
239
240impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap>
241where
242    TInnerFut: TryFuture<Ok = TIn>,
243    TMap: FnOnce(TIn) -> TOut,
244{
245    type Output = Result<TOut, TInnerFut::Error>;
246
247    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248        let this = self.project();
249        let item = match TryFuture::try_poll(this.inner, cx) {
250            Poll::Ready(Ok(v)) => v,
251            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
252            Poll::Pending => return Poll::Pending,
253        };
254
255        let map = this.map.take().expect("Future has already finished");
256        Poll::Ready(Ok(map(item)))
257    }
258}
259
260#[pin_project::pin_project]
261pub struct MapErrFuture<T, F> {
262    #[pin]
263    fut: T,
264    fun: Option<F>,
265}
266
267impl<T, E, F, A> Future for MapErrFuture<T, F>
268where
269    T: TryFuture<Error = E>,
270    F: FnOnce(E) -> A,
271{
272    type Output = Result<T::Ok, A>;
273
274    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275        let this = self.project();
276        match TryFuture::try_poll(this.fut, cx) {
277            Poll::Pending => Poll::Pending,
278            Poll::Ready(Ok(x)) => Poll::Ready(Ok(x)),
279            Poll::Ready(Err(e)) => {
280                let f = this.fun.take().expect("Future has not resolved yet");
281                Poll::Ready(Err(f(e)))
282            }
283        }
284    }
285}
286