mwc_libp2p_core/transport/
map.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    ConnectedPoint,
23    transport::{Transport, TransportError, ListenerEvent}
24};
25use futures::prelude::*;
26use multiaddr::Multiaddr;
27use std::{pin::Pin, task::Context, task::Poll};
28
29/// See `Transport::map`.
30#[derive(Debug, Copy, Clone)]
31pub struct Map<T, F> { transport: T, fun: F }
32
33impl<T, F> Map<T, F> {
34    pub(crate) fn new(transport: T, fun: F) -> Self {
35        Map { transport, fun }
36    }
37}
38
39impl<T, F, D> Transport for Map<T, F>
40where
41    T: Transport,
42    F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
43{
44    type Output = D;
45    type Error = T::Error;
46    type Listener = MapStream<T::Listener, F>;
47    type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
48    type Dial = MapFuture<T::Dial, F>;
49
50    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
51        let stream = self.transport.listen_on(addr)?;
52        Ok(MapStream { stream, fun: self.fun })
53    }
54
55    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
56        let future = self.transport.dial(addr.clone())?;
57        let p = ConnectedPoint::Dialer { address: addr };
58        Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
59    }
60
61    fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
62        self.transport.address_translation(server, observed)
63    }
64}
65
66/// Custom `Stream` implementation to avoid boxing.
67///
68/// Maps a function over every stream item.
69#[pin_project::pin_project]
70#[derive(Clone, Debug)]
71pub struct MapStream<T, F> { #[pin] stream: T, fun: F }
72
73impl<T, F, A, B, X, E> Stream for MapStream<T, F>
74where
75    T: TryStream<Ok = ListenerEvent<X, E>, Error = E>,
76    X: TryFuture<Ok = A>,
77    F: FnOnce(A, ConnectedPoint) -> B + Clone
78{
79    type Item = Result<ListenerEvent<MapFuture<X, F>, E>, E>;
80
81    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82        let this = self.project();
83        match TryStream::try_poll_next(this.stream, cx) {
84            Poll::Ready(Some(Ok(event))) => {
85                let event = match event {
86                    ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
87                        let point = ConnectedPoint::Listener {
88                            local_addr: local_addr.clone(),
89                            send_back_addr: remote_addr.clone()
90                        };
91                        ListenerEvent::Upgrade {
92                            upgrade: MapFuture {
93                                inner: upgrade,
94                                args: Some((this.fun.clone(), point))
95                            },
96                            local_addr,
97                            remote_addr
98                        }
99                    }
100                    ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
101                    ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
102                    ListenerEvent::Error(e) => ListenerEvent::Error(e),
103                };
104                Poll::Ready(Some(Ok(event)))
105            }
106            Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
107            Poll::Ready(None) => Poll::Ready(None),
108            Poll::Pending => Poll::Pending
109        }
110    }
111}
112
113/// Custom `Future` to avoid boxing.
114///
115/// Applies a function to the inner future's result.
116#[pin_project::pin_project]
117#[derive(Clone, Debug)]
118pub struct MapFuture<T, F> {
119    #[pin]
120    inner: T,
121    args: Option<(F, ConnectedPoint)>
122}
123
124impl<T, A, F, B> Future for MapFuture<T, F>
125where
126    T: TryFuture<Ok = A>,
127    F: FnOnce(A, ConnectedPoint) -> B
128{
129    type Output = Result<B, T::Error>;
130
131    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        let this = self.project();
133        let item = match TryFuture::try_poll(this.inner, cx) {
134            Poll::Pending => return Poll::Pending,
135            Poll::Ready(Ok(v)) => v,
136            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
137        };
138        let (f, a) = this.args.take().expect("MapFuture has already finished.");
139        Poll::Ready(Ok(f(item, a)))
140    }
141}