1use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
22use futures::prelude::*;
23use std::{pin::Pin, task::Context, task::Poll};
24
25#[derive(Debug, Clone)]
27pub struct MapInboundUpgrade<U, F> { upgrade: U, fun: F }
28
29impl<U, F> MapInboundUpgrade<U, F> {
30 pub fn new(upgrade: U, fun: F) -> Self {
31 MapInboundUpgrade { upgrade, fun }
32 }
33}
34
35impl<U, F> UpgradeInfo for MapInboundUpgrade<U, F>
36where
37 U: UpgradeInfo
38{
39 type Info = U::Info;
40 type InfoIter = U::InfoIter;
41
42 fn protocol_info(&self) -> Self::InfoIter {
43 self.upgrade.protocol_info()
44 }
45}
46
47impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgrade<U, F>
48where
49 U: InboundUpgrade<C>,
50 F: FnOnce(U::Output) -> T
51{
52 type Output = T;
53 type Error = U::Error;
54 type Future = MapFuture<U::Future, F>;
55
56 fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
57 MapFuture {
58 inner: self.upgrade.upgrade_inbound(sock, info),
59 map: Some(self.fun)
60 }
61 }
62}
63
64impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgrade<U, F>
65where
66 U: OutboundUpgrade<C>,
67{
68 type Output = U::Output;
69 type Error = U::Error;
70 type Future = U::Future;
71
72 fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
73 self.upgrade.upgrade_outbound(sock, info)
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct MapOutboundUpgrade<U, F> { upgrade: U, fun: F }
80
81impl<U, F> MapOutboundUpgrade<U, F> {
82 pub fn new(upgrade: U, fun: F) -> Self {
83 MapOutboundUpgrade { upgrade, fun }
84 }
85}
86
87impl<U, F> UpgradeInfo for MapOutboundUpgrade<U, F>
88where
89 U: UpgradeInfo
90{
91 type Info = U::Info;
92 type InfoIter = U::InfoIter;
93
94 fn protocol_info(&self) -> Self::InfoIter {
95 self.upgrade.protocol_info()
96 }
97}
98
99impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgrade<U, F>
100where
101 U: InboundUpgrade<C>,
102{
103 type Output = U::Output;
104 type Error = U::Error;
105 type Future = U::Future;
106
107 fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
108 self.upgrade.upgrade_inbound(sock, info)
109 }
110}
111
112impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgrade<U, F>
113where
114 U: OutboundUpgrade<C>,
115 F: FnOnce(U::Output) -> T
116{
117 type Output = T;
118 type Error = U::Error;
119 type Future = MapFuture<U::Future, F>;
120
121 fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
122 MapFuture {
123 inner: self.upgrade.upgrade_outbound(sock, info),
124 map: Some(self.fun)
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct MapInboundUpgradeErr<U, F> { upgrade: U, fun: F }
132
133impl<U, F> MapInboundUpgradeErr<U, F> {
134 pub fn new(upgrade: U, fun: F) -> Self {
135 MapInboundUpgradeErr { upgrade, fun }
136 }
137}
138
139impl<U, F> UpgradeInfo for MapInboundUpgradeErr<U, F>
140where
141 U: UpgradeInfo
142{
143 type Info = U::Info;
144 type InfoIter = U::InfoIter;
145
146 fn protocol_info(&self) -> Self::InfoIter {
147 self.upgrade.protocol_info()
148 }
149}
150
151impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgradeErr<U, F>
152where
153 U: InboundUpgrade<C>,
154 F: FnOnce(U::Error) -> T
155{
156 type Output = U::Output;
157 type Error = T;
158 type Future = MapErrFuture<U::Future, F>;
159
160 fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
161 MapErrFuture {
162 fut: self.upgrade.upgrade_inbound(sock, info),
163 fun: Some(self.fun)
164 }
165 }
166}
167
168impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgradeErr<U, F>
169where
170 U: OutboundUpgrade<C>,
171{
172 type Output = U::Output;
173 type Error = U::Error;
174 type Future = U::Future;
175
176 fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
177 self.upgrade.upgrade_outbound(sock, info)
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct MapOutboundUpgradeErr<U, F> { upgrade: U, fun: F }
184
185impl<U, F> MapOutboundUpgradeErr<U, F> {
186 pub fn new(upgrade: U, fun: F) -> Self {
187 MapOutboundUpgradeErr { upgrade, fun }
188 }
189}
190
191impl<U, F> UpgradeInfo for MapOutboundUpgradeErr<U, F>
192where
193 U: UpgradeInfo
194{
195 type Info = U::Info;
196 type InfoIter = U::InfoIter;
197
198 fn protocol_info(&self) -> Self::InfoIter {
199 self.upgrade.protocol_info()
200 }
201}
202
203impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgradeErr<U, F>
204where
205 U: OutboundUpgrade<C>,
206 F: FnOnce(U::Error) -> T
207{
208 type Output = U::Output;
209 type Error = T;
210 type Future = MapErrFuture<U::Future, F>;
211
212 fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
213 MapErrFuture {
214 fut: self.upgrade.upgrade_outbound(sock, info),
215 fun: Some(self.fun)
216 }
217 }
218}
219
220impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgradeErr<U, F>
221where
222 U: InboundUpgrade<C>
223{
224 type Output = U::Output;
225 type Error = U::Error;
226 type Future = U::Future;
227
228 fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
229 self.upgrade.upgrade_inbound(sock, info)
230 }
231}
232
233#[pin_project::pin_project]
234pub struct MapFuture<TInnerFut, TMap> {
235 #[pin]
236 inner: TInnerFut,
237 map: Option<TMap>,
238}
239
240impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap>
241where
242 TInnerFut: TryFuture<Ok = TIn>,
243 TMap: FnOnce(TIn) -> TOut,
244{
245 type Output = Result<TOut, TInnerFut::Error>;
246
247 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 let this = self.project();
249 let item = match TryFuture::try_poll(this.inner, cx) {
250 Poll::Ready(Ok(v)) => v,
251 Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
252 Poll::Pending => return Poll::Pending,
253 };
254
255 let map = this.map.take().expect("Future has already finished");
256 Poll::Ready(Ok(map(item)))
257 }
258}
259
260#[pin_project::pin_project]
261pub struct MapErrFuture<T, F> {
262 #[pin]
263 fut: T,
264 fun: Option<F>,
265}
266
267impl<T, E, F, A> Future for MapErrFuture<T, F>
268where
269 T: TryFuture<Error = E>,
270 F: FnOnce(E) -> A,
271{
272 type Output = Result<T::Ok, A>;
273
274 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275 let this = self.project();
276 match TryFuture::try_poll(this.fut, cx) {
277 Poll::Pending => Poll::Pending,
278 Poll::Ready(Ok(x)) => Poll::Ready(Ok(x)),
279 Poll::Ready(Err(e)) => {
280 let f = this.fun.take().expect("Future has not resolved yet");
281 Poll::Ready(Err(f(e)))
282 }
283 }
284 }
285}
286