1use crate::{ConnectedPoint, Listener, ListenerEvent, Multiaddr, Transport, TransportError};
2use futures::TryFuture;
3use std::{
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9#[derive(Debug, Copy, Clone)]
10pub struct Map<T, F> {
11 transport: T,
12 map: F,
13}
14
15impl<T, TMap> Map<T, TMap> {
16 pub(crate) fn new(transport: T, map: TMap) -> Self {
17 Map { transport, map }
18 }
19
20 pub fn inner(&self) -> &T {
21 &self.transport
22 }
23}
24
25impl<T, D, TMap> Transport for Map<T, TMap>
26where
27 T: Transport,
28 TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
29{
30 type Output = D;
31 type Error = T::Error;
32 type Dial = MapFuture<T::Dial, TMap>;
33 type Incoming = MapFuture<T::Incoming, TMap>;
34 type Listener = MapListener<T, TMap>;
35
36 fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
37 match self.transport.dial(addr.clone()) {
38 Ok(dial) => Ok(MapFuture {
39 inner: dial,
40 args: Some((self.map.clone(), ConnectedPoint::Dialer { addr })),
41 }),
42 Err(err) => Err(err),
43 }
44 }
45
46 fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
47 match self.transport.listen(addr) {
48 Ok(listener) => Ok(MapListener {
49 inner: listener,
50 fun: self.map.clone(),
51 _phantom: PhantomData,
52 }),
53 Err(err) => Err(err),
54 }
55 }
56}
57
58#[pin_project::pin_project]
59#[derive(Clone, Debug)]
60pub struct MapListener<T, TMap>
61where
62 T: Transport,
63{
64 #[pin]
65 inner: T::Listener,
66 fun: TMap,
67 _phantom: PhantomData<T>,
68}
69
70impl<D, T, TMap> Listener for MapListener<T, TMap>
71where
72 T: Transport,
73 TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
74{
75 type Output = D;
76 type Error = T::Error;
77 type Upgrade = MapFuture<T::Incoming, TMap>;
78
79 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80 let this = self.project();
81 this.inner.poll_close(cx)
82 }
83
84 fn poll_event(
85 self: Pin<&mut Self>,
86 cx: &mut Context<'_>,
87 ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
88 let this = self.project();
89 this.inner.poll_event(cx).map(|event| match event {
90 ListenerEvent::Incoming {
91 local_addr,
92 remote_addr,
93 upgrade,
94 } => ListenerEvent::Incoming {
95 local_addr: local_addr.clone(),
96 remote_addr: remote_addr.clone(),
97 upgrade: MapFuture {
98 inner: upgrade,
99 args: Some((
100 this.fun.clone(),
101 ConnectedPoint::Listener {
102 local_addr,
103 remote_addr,
104 },
105 )),
106 },
107 },
108 ListenerEvent::Closed(cause) => ListenerEvent::Closed(cause),
109 ListenerEvent::NewAddress(addr) => ListenerEvent::NewAddress(addr),
110 ListenerEvent::AddressExpired(addr) => ListenerEvent::AddressExpired(addr),
111 ListenerEvent::Error(err) => ListenerEvent::Error(err),
112 })
113 }
114}
115
116#[pin_project::pin_project]
117#[derive(Debug)]
118pub struct MapFuture<T, F> {
119 #[pin]
120 inner: T,
121 args: Option<(F, ConnectedPoint)>,
122}
123
124impl<T, A, F, B> Future for MapFuture<T, F>
125where
126 T: TryFuture<Ok = A>,
127 F: FnOnce(A, ConnectedPoint) -> B,
128{
129 type Output = Result<B, T::Error>;
130
131 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132 let this = self.project();
133 let item = match TryFuture::try_poll(this.inner, cx) {
134 Poll::Pending => return Poll::Pending,
135 Poll::Ready(Ok(v)) => v,
136 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
137 };
138 let (f, a) = this.args.take().expect("MapFuture has already finished.");
139 Poll::Ready(Ok(f(item, a)))
140 }
141}