1use std::{
2 marker::PhantomData,
3 net::SocketAddr,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::{Stream, TryFuture};
9
10use crate::{ConnectedPoint, ListenerEvent, Transport};
11
12#[derive(Debug, Copy, Clone)]
14#[pin_project::pin_project]
15pub struct Map<T, TMap> {
16 #[pin]
17 transport: T,
18 fun: TMap,
19}
20
21impl<T, TMap> Map<T, TMap> {
22 pub(crate) fn new(transport: T, fun: TMap) -> Self {
23 Map { transport, fun }
24 }
25
26 pub fn inner(&self) -> &T {
27 &self.transport
28 }
29}
30
31impl<T, TMap, O> Transport for Map<T, TMap>
32where
33 T: Transport,
34 TMap: FnOnce(T::Output, ConnectedPoint) -> O + Clone,
35{
36 type Output = O;
37 type Error = T::Error;
38 type ListenerUpgrade = MapFuture<T::ListenerUpgrade, TMap>;
39 type Dialer = MapFuture<T::Dialer, TMap>;
40 type Listener = MapListener<T, TMap>;
41
42 fn connect(&self, addr: SocketAddr) -> Result<Self::Dialer, Self::Error> {
43 let dialer = self.transport.connect(addr)?;
44 let connected_point = ConnectedPoint::Dialer { addr };
45 Ok(MapFuture {
46 inner: dialer,
47 args: Some((self.fun.clone(), connected_point)),
48 })
49 }
50
51 fn listen(&self, addr: SocketAddr) -> Result<Self::Listener, Self::Error> {
52 let listener = self.transport.listen(addr)?;
53 Ok(MapListener {
54 inner: listener,
55 fun: self.fun.clone(),
56 _phantom: PhantomData,
57 })
58 }
59}
60
61#[pin_project::pin_project]
62#[derive(Clone, Debug)]
63pub struct MapListener<T, TMap>
64where
65 T: Transport,
66{
67 #[pin]
68 inner: T::Listener,
69 fun: TMap,
70 _phantom: PhantomData<T>,
71}
72
73impl<D, T, TMap> Stream for MapListener<T, TMap>
74where
75 T: Transport,
76 TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
77{
78 type Item = ListenerEvent<MapFuture<T::ListenerUpgrade, TMap>, T::Error>;
79
80 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 let mut this = self.project();
82 let event = match Pin::new(&mut this.inner).as_mut().poll_next(cx) {
83 Poll::Ready(Some(event)) => match event {
84 ListenerEvent::Listened(addr) => ListenerEvent::Listened(addr),
85 ListenerEvent::Incoming {
86 local_addr,
87 remote_addr,
88 upgrade,
89 } => ListenerEvent::Incoming {
90 local_addr,
91 remote_addr,
92 upgrade: MapFuture {
93 inner: upgrade,
94 args: Some((
95 this.fun.clone(),
96 ConnectedPoint::Listener {
97 local_addr,
98 remote_addr,
99 },
100 )),
101 },
102 },
103 ListenerEvent::Closed(result) => ListenerEvent::Closed(result),
104 ListenerEvent::Error(err) => ListenerEvent::Error(err),
105 },
106 Poll::Ready(None) => return Poll::Ready(None),
107 Poll::Pending => return Poll::Pending,
108 };
109
110 return Poll::Ready(Some(event));
111 }
112}
113
114#[pin_project::pin_project]
115#[derive(Clone, Debug)]
116pub struct MapFuture<T, F> {
117 #[pin]
118 inner: T,
119 args: Option<(F, ConnectedPoint)>,
120}
121
122impl<T, A, F, B> Future for MapFuture<T, F>
123where
124 T: TryFuture<Ok = A>,
125 F: FnOnce(A, ConnectedPoint) -> B,
126{
127 type Output = Result<B, T::Error>;
128
129 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130 let this = self.project();
131 let item = match TryFuture::try_poll(this.inner, cx) {
132 Poll::Pending => return Poll::Pending,
133 Poll::Ready(Ok(v)) => v,
134 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
135 };
136 let (f, a) = this.args.take().expect("MapFuture has already finished.");
137 Poll::Ready(Ok(f(item, a)))
138 }
139}