Skip to main content

airio_core/transport/
map.rs

1use std::{
2    marker::PhantomData,
3    net::SocketAddr,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::{Stream, TryFuture};
9
10use crate::{ConnectedPoint, ListenerEvent, Transport};
11
12/// 将 Transport 的输出使用函数映射到另一个类型。
13#[derive(Debug, Copy, Clone)]
14#[pin_project::pin_project]
15pub struct Map<T, TMap> {
16    #[pin]
17    transport: T,
18    fun: TMap,
19}
20
21impl<T, TMap> Map<T, TMap> {
22    pub(crate) fn new(transport: T, fun: TMap) -> Self {
23        Map { transport, fun }
24    }
25
26    pub fn inner(&self) -> &T {
27        &self.transport
28    }
29}
30
31impl<T, TMap, O> Transport for Map<T, TMap>
32where
33    T: Transport,
34    TMap: FnOnce(T::Output, ConnectedPoint) -> O + Clone,
35{
36    type Output = O;
37    type Error = T::Error;
38    type ListenerUpgrade = MapFuture<T::ListenerUpgrade, TMap>;
39    type Dialer = MapFuture<T::Dialer, TMap>;
40    type Listener = MapListener<T, TMap>;
41
42    fn connect(&self, addr: SocketAddr) -> Result<Self::Dialer, Self::Error> {
43        let dialer = self.transport.connect(addr)?;
44        let connected_point = ConnectedPoint::Dialer { addr };
45        Ok(MapFuture {
46            inner: dialer,
47            args: Some((self.fun.clone(), connected_point)),
48        })
49    }
50
51    fn listen(&self, addr: SocketAddr) -> Result<Self::Listener, Self::Error> {
52        let listener = self.transport.listen(addr)?;
53        Ok(MapListener {
54            inner: listener,
55            fun: self.fun.clone(),
56            _phantom: PhantomData,
57        })
58    }
59}
60
61#[pin_project::pin_project]
62#[derive(Clone, Debug)]
63pub struct MapListener<T, TMap>
64where
65    T: Transport,
66{
67    #[pin]
68    inner: T::Listener,
69    fun: TMap,
70    _phantom: PhantomData<T>,
71}
72
73impl<D, T, TMap> Stream for MapListener<T, TMap>
74where
75    T: Transport,
76    TMap: FnOnce(T::Output, ConnectedPoint) -> D + Clone,
77{
78    type Item = ListenerEvent<MapFuture<T::ListenerUpgrade, TMap>, T::Error>;
79
80    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81        let mut this = self.project();
82        let event = match Pin::new(&mut this.inner).as_mut().poll_next(cx) {
83            Poll::Ready(Some(event)) => match event {
84                ListenerEvent::Listened(addr) => ListenerEvent::Listened(addr),
85                ListenerEvent::Incoming {
86                    local_addr,
87                    remote_addr,
88                    upgrade,
89                } => ListenerEvent::Incoming {
90                    local_addr,
91                    remote_addr,
92                    upgrade: MapFuture {
93                        inner: upgrade,
94                        args: Some((
95                            this.fun.clone(),
96                            ConnectedPoint::Listener {
97                                local_addr,
98                                remote_addr,
99                            },
100                        )),
101                    },
102                },
103                ListenerEvent::Closed(result) => ListenerEvent::Closed(result),
104                ListenerEvent::Error(err) => ListenerEvent::Error(err),
105            },
106            Poll::Ready(None) => return Poll::Ready(None),
107            Poll::Pending => return Poll::Pending,
108        };
109
110        return Poll::Ready(Some(event));
111    }
112}
113
114#[pin_project::pin_project]
115#[derive(Clone, Debug)]
116pub struct MapFuture<T, F> {
117    #[pin]
118    inner: T,
119    args: Option<(F, ConnectedPoint)>,
120}
121
122impl<T, A, F, B> Future for MapFuture<T, F>
123where
124    T: TryFuture<Ok = A>,
125    F: FnOnce(A, ConnectedPoint) -> B,
126{
127    type Output = Result<B, T::Error>;
128
129    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
130        let this = self.project();
131        let item = match TryFuture::try_poll(this.inner, cx) {
132            Poll::Pending => return Poll::Pending,
133            Poll::Ready(Ok(v)) => v,
134            Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
135        };
136        let (f, a) = this.args.take().expect("MapFuture has already finished.");
137        Poll::Ready(Ok(f(item, a)))
138    }
139}