Skip to main content

volans_core/transport/
map.rs

1use crate::{ConnectedPoint, Listener, ListenerEvent, Multiaddr, Transport, TransportError};
2use futures::TryFuture;
3use std::{
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9#[derive(Debug, Copy, Clone)]
10pub struct Map<T, F> {
11    transport: T,
12    map: F,
13}
14
15impl<T, TMap> Map<T, TMap> {
16    pub(crate) fn new(transport: T, map: TMap) -> Self {
17        Map { transport, map }
18    }
19
20    pub fn inner(&self) -> &T {
21        &self.transport
22    }
23}
24
25impl<T, D, TMap> Transport for Map<T, TMap>
26where
27    T: Transport,
28    TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
29{
30    type Output = D;
31    type Error = T::Error;
32    type Dial = MapFuture<T::Dial, TMap>;
33    type Incoming = MapFuture<T::Incoming, TMap>;
34    type Listener = MapListener<T, TMap>;
35
36    fn dial(&self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
37        match self.transport.dial(addr.clone()) {
38            Ok(dial) => Ok(MapFuture {
39                inner: dial,
40                args: Some((self.map.clone(), ConnectedPoint::Dialer { addr })),
41            }),
42            Err(err) => Err(err),
43        }
44    }
45
46    fn listen(&self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
47        match self.transport.listen(addr) {
48            Ok(listener) => Ok(MapListener {
49                inner: listener,
50                fun: self.map.clone(),
51                _phantom: PhantomData,
52            }),
53            Err(err) => Err(err),
54        }
55    }
56}
57
58#[pin_project::pin_project]
59#[derive(Clone, Debug)]
60pub struct MapListener<T, TMap>
61where
62    T: Transport,
63{
64    #[pin]
65    inner: T::Listener,
66    fun: TMap,
67    _phantom: PhantomData<T>,
68}
69
70impl<D, T, TMap> Listener for MapListener<T, TMap>
71where
72    T: Transport,
73    TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
74{
75    type Output = D;
76    type Error = T::Error;
77    type Upgrade = MapFuture<T::Incoming, TMap>;
78
79    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
80        let this = self.project();
81        this.inner.poll_close(cx)
82    }
83
84    fn poll_event(
85        self: Pin<&mut Self>,
86        cx: &mut Context<'_>,
87    ) -> Poll<ListenerEvent<Self::Upgrade, Self::Error>> {
88        let this = self.project();
89        this.inner.poll_event(cx).map(|event| match event {
90            ListenerEvent::Incoming {
91                local_addr,
92                remote_addr,
93                upgrade,
94            } => ListenerEvent::Incoming {
95                local_addr: local_addr.clone(),
96                remote_addr: remote_addr.clone(),
97                upgrade: MapFuture {
98                    inner: upgrade,
99                    args: Some((
100                        this.fun.clone(),
101                        ConnectedPoint::Listener {
102                            local_addr,
103                            remote_addr,
104                        },
105                    )),
106                },
107            },
108            ListenerEvent::Closed(cause) => ListenerEvent::Closed(cause),
109            ListenerEvent::NewAddress(addr) => ListenerEvent::NewAddress(addr),
110            ListenerEvent::AddressExpired(addr) => ListenerEvent::AddressExpired(addr),
111            ListenerEvent::Error(err) => ListenerEvent::Error(err),
112        })
113    }
114}
115
116#[pin_project::pin_project]
117#[derive(Debug)]
118pub struct MapFuture<T, F> {
119    #[pin]
120    inner: T,
121    args: Option<(F, ConnectedPoint)>,
122}
123
124impl<T, A, F, B> Future for MapFuture<T, F>
125where
126    T: TryFuture<Ok = A>,
127    F: FnOnce(A, ConnectedPoint) -> B,
128{
129    type Output = Result<B, T::Error>;
130
131    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
132        let this = self.project();
133        let item = match TryFuture::try_poll(this.inner, cx) {
134            Poll::Pending => return Poll::Pending,
135            Poll::Ready(Ok(v)) => v,
136            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
137        };
138        let (f, a) = this.args.take().expect("MapFuture has already finished.");
139        Poll::Ready(Ok(f(item, a)))
140    }
141}