mwc_libp2p_core/transport/
map.rs1use crate::{
22 ConnectedPoint,
23 transport::{Transport, TransportError, ListenerEvent}
24};
25use futures::prelude::*;
26use multiaddr::Multiaddr;
27use std::{pin::Pin, task::Context, task::Poll};
28
29#[derive(Debug, Copy, Clone)]
31pub struct Map<T, F> { transport: T, fun: F }
32
33impl<T, F> Map<T, F> {
34 pub(crate) fn new(transport: T, fun: F) -> Self {
35 Map { transport, fun }
36 }
37}
38
39impl<T, F, D> Transport for Map<T, F>
40where
41 T: Transport,
42 F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
43{
44 type Output = D;
45 type Error = T::Error;
46 type Listener = MapStream<T::Listener, F>;
47 type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
48 type Dial = MapFuture<T::Dial, F>;
49
50 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
51 let stream = self.transport.listen_on(addr)?;
52 Ok(MapStream { stream, fun: self.fun })
53 }
54
55 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
56 let future = self.transport.dial(addr.clone())?;
57 let p = ConnectedPoint::Dialer { address: addr };
58 Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
59 }
60
61 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
62 self.transport.address_translation(server, observed)
63 }
64}
65
66#[pin_project::pin_project]
70#[derive(Clone, Debug)]
71pub struct MapStream<T, F> { #[pin] stream: T, fun: F }
72
73impl<T, F, A, B, X, E> Stream for MapStream<T, F>
74where
75 T: TryStream<Ok = ListenerEvent<X, E>, Error = E>,
76 X: TryFuture<Ok = A>,
77 F: FnOnce(A, ConnectedPoint) -> B + Clone
78{
79 type Item = Result<ListenerEvent<MapFuture<X, F>, E>, E>;
80
81 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
82 let this = self.project();
83 match TryStream::try_poll_next(this.stream, cx) {
84 Poll::Ready(Some(Ok(event))) => {
85 let event = match event {
86 ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
87 let point = ConnectedPoint::Listener {
88 local_addr: local_addr.clone(),
89 send_back_addr: remote_addr.clone()
90 };
91 ListenerEvent::Upgrade {
92 upgrade: MapFuture {
93 inner: upgrade,
94 args: Some((this.fun.clone(), point))
95 },
96 local_addr,
97 remote_addr
98 }
99 }
100 ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
101 ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
102 ListenerEvent::Error(e) => ListenerEvent::Error(e),
103 };
104 Poll::Ready(Some(Ok(event)))
105 }
106 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
107 Poll::Ready(None) => Poll::Ready(None),
108 Poll::Pending => Poll::Pending
109 }
110 }
111}
112
113#[pin_project::pin_project]
117#[derive(Clone, Debug)]
118pub struct MapFuture<T, F> {
119 #[pin]
120 inner: T,
121 args: Option<(F, ConnectedPoint)>
122}
123
124impl<T, A, F, B> Future for MapFuture<T, F>
125where
126 T: TryFuture<Ok = A>,
127 F: FnOnce(A, ConnectedPoint) -> B
128{
129 type Output = Result<B, T::Error>;
130
131 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 let this = self.project();
133 let item = match TryFuture::try_poll(this.inner, cx) {
134 Poll::Pending => return Poll::Pending,
135 Poll::Ready(Ok(v)) => v,
136 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
137 };
138 let (f, a) = this.args.take().expect("MapFuture has already finished.");
139 Poll::Ready(Ok(f(item, a)))
140 }
141}