volans_swarm/handler/
map.rs

1use std::{
2    fmt,
3    marker::PhantomData,
4    task::{Context, Poll},
5};
6
7use crate::{
8    ConnectionHandler, ConnectionHandlerEvent, InboundStreamHandler, InboundUpgradeSend,
9    OutboundStreamHandler, OutboundUpgradeSend, StreamUpgradeError, SubstreamProtocol,
10};
11
12#[derive(Debug)]
13pub struct MapEvent<THandler, TMap> {
14    inner: THandler,
15    map: TMap,
16}
17
18impl<THandler, TMap> MapEvent<THandler, TMap> {
19    pub(crate) fn new(inner: THandler, map: TMap) -> Self {
20        Self { inner, map }
21    }
22}
23
24impl<THandler, O, TMap> ConnectionHandler for MapEvent<THandler, TMap>
25where
26    THandler: ConnectionHandler,
27    O: fmt::Debug + Send + Clone + 'static,
28    TMap: Send + 'static,
29    TMap: FnMut(THandler::Event) -> O,
30{
31    type Action = THandler::Action;
32    type Event = O;
33
34    fn handle_action(&mut self, action: Self::Action) {
35        self.inner.handle_action(action);
36    }
37
38    fn connection_keep_alive(&self) -> bool {
39        self.inner.connection_keep_alive()
40    }
41
42    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Event>> {
43        self.inner.poll_close(cx).map(|e| e.map(|e| (self.map)(e)))
44    }
45
46    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<Self::Event>> {
47        self.inner.poll(cx).map(|e| e.map_event(|e| (self.map)(e)))
48    }
49}
50
51impl<THandler, O, TMap> InboundStreamHandler for MapEvent<THandler, TMap>
52where
53    THandler: InboundStreamHandler,
54    O: fmt::Debug + Send + Clone + 'static,
55    TMap: Send + 'static,
56    TMap: FnMut(THandler::Event) -> O,
57{
58    type InboundUpgrade = THandler::InboundUpgrade;
59    type InboundUserData = THandler::InboundUserData;
60
61    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundUpgrade, Self::InboundUserData> {
62        self.inner.listen_protocol()
63    }
64
65    fn on_fully_negotiated(
66        &mut self,
67        user_data: Self::InboundUserData,
68        protocol: <Self::InboundUpgrade as InboundUpgradeSend>::Output,
69    ) {
70        self.inner.on_fully_negotiated(user_data, protocol);
71    }
72
73    fn on_upgrade_error(
74        &mut self,
75        user_data: Self::InboundUserData,
76        error: <Self::InboundUpgrade as InboundUpgradeSend>::Error,
77    ) {
78        self.inner.on_upgrade_error(user_data, error);
79    }
80}
81
82impl<THandler, O, TMap> OutboundStreamHandler for MapEvent<THandler, TMap>
83where
84    THandler: OutboundStreamHandler,
85    O: fmt::Debug + Send + Clone + 'static,
86    TMap: Send + 'static,
87    TMap: FnMut(THandler::Event) -> O,
88{
89    type OutboundUpgrade = THandler::OutboundUpgrade;
90    type OutboundUserData = THandler::OutboundUserData;
91
92    fn on_fully_negotiated(
93        &mut self,
94        user_data: Self::OutboundUserData,
95        protocol: <Self::OutboundUpgrade as OutboundUpgradeSend>::Output,
96    ) {
97        self.inner.on_fully_negotiated(user_data, protocol);
98    }
99
100    fn on_upgrade_error(
101        &mut self,
102        user_data: Self::OutboundUserData,
103        error: StreamUpgradeError<<Self::OutboundUpgrade as OutboundUpgradeSend>::Error>,
104    ) {
105        self.inner.on_upgrade_error(user_data, error);
106    }
107
108    fn poll_outbound_request(
109        &mut self,
110        cx: &mut Context<'_>,
111    ) -> Poll<SubstreamProtocol<Self::OutboundUpgrade, Self::OutboundUserData>> {
112        self.inner.poll_outbound_request(cx)
113    }
114}
115
116#[derive(Debug)]
117pub struct MapAction<THandler, O, TMap> {
118    inner: THandler,
119    map: TMap,
120    _marker: PhantomData<O>,
121}
122
123impl<THandler, O, TMap> MapAction<THandler, O, TMap> {
124    pub(crate) fn new(inner: THandler, map: TMap) -> Self {
125        Self {
126            inner,
127            map,
128            _marker: PhantomData,
129        }
130    }
131}
132
133impl<THandler, O, TMap> ConnectionHandler for MapAction<THandler, O, TMap>
134where
135    THandler: ConnectionHandler,
136    O: fmt::Debug + Send + Clone + 'static,
137    TMap: Send + 'static,
138    TMap: Fn(O) -> Option<THandler::Action>,
139{
140    type Action = O;
141    type Event = THandler::Event;
142
143    fn handle_action(&mut self, action: Self::Action) {
144        if let Some(action) = (self.map)(action) {
145            self.inner.handle_action(action);
146        }
147    }
148
149    fn connection_keep_alive(&self) -> bool {
150        self.inner.connection_keep_alive()
151    }
152
153    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Event>> {
154        self.inner.poll_close(cx)
155    }
156
157    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionHandlerEvent<Self::Event>> {
158        self.inner.poll(cx)
159    }
160}
161
162impl<THandler, O, TMap> InboundStreamHandler for MapAction<THandler, O, TMap>
163where
164    THandler: InboundStreamHandler,
165    O: fmt::Debug + Send + Clone + 'static,
166    TMap: Send + 'static,
167    TMap: Fn(O) -> Option<THandler::Action>,
168{
169    type InboundUpgrade = THandler::InboundUpgrade;
170    type InboundUserData = THandler::InboundUserData;
171
172    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundUpgrade, Self::InboundUserData> {
173        self.inner.listen_protocol()
174    }
175
176    fn on_fully_negotiated(
177        &mut self,
178        user_data: Self::InboundUserData,
179        protocol: <Self::InboundUpgrade as InboundUpgradeSend>::Output,
180    ) {
181        self.inner.on_fully_negotiated(user_data, protocol);
182    }
183
184    fn on_upgrade_error(
185        &mut self,
186        user_data: Self::InboundUserData,
187        error: <Self::InboundUpgrade as InboundUpgradeSend>::Error,
188    ) {
189        self.inner.on_upgrade_error(user_data, error);
190    }
191}
192
193impl<THandler, O, TMap> OutboundStreamHandler for MapAction<THandler, O, TMap>
194where
195    THandler: OutboundStreamHandler,
196    O: fmt::Debug + Send + Clone + 'static,
197    TMap: Send + 'static,
198    TMap: Fn(O) -> Option<THandler::Action>,
199{
200    type OutboundUpgrade = THandler::OutboundUpgrade;
201    type OutboundUserData = THandler::OutboundUserData;
202
203    fn on_fully_negotiated(
204        &mut self,
205        user_data: Self::OutboundUserData,
206        protocol: <Self::OutboundUpgrade as OutboundUpgradeSend>::Output,
207    ) {
208        self.inner.on_fully_negotiated(user_data, protocol);
209    }
210
211    fn on_upgrade_error(
212        &mut self,
213        user_data: Self::OutboundUserData,
214        error: StreamUpgradeError<<Self::OutboundUpgrade as OutboundUpgradeSend>::Error>,
215    ) {
216        self.inner.on_upgrade_error(user_data, error);
217    }
218
219    fn poll_outbound_request(
220        &mut self,
221        cx: &mut Context<'_>,
222    ) -> Poll<SubstreamProtocol<Self::OutboundUpgrade, Self::OutboundUserData>> {
223        self.inner.poll_outbound_request(cx)
224    }
225}