mwc_libp2p_core/upgrade/
apply.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{ConnectedPoint, Negotiated};
22use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName};
23use futures::{future::Either, prelude::*};
24use log::debug;
25use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
26use std::{iter, mem, pin::Pin, task::Context, task::Poll};
27
28pub use multistream_select::Version;
29
30/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
31pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint, v: Version)
32    -> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
33where
34    C: AsyncRead + AsyncWrite + Unpin,
35    U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
36{
37    if cp.is_listener() {
38        Either::Left(apply_inbound(conn, up))
39    } else {
40        Either::Right(apply_outbound(conn, up, v))
41    }
42}
43
44/// Tries to perform an upgrade on an inbound connection or substream.
45pub fn apply_inbound<C, U>(conn: C, up: U) -> InboundUpgradeApply<C, U>
46where
47    C: AsyncRead + AsyncWrite + Unpin,
48    U: InboundUpgrade<Negotiated<C>>,
49{
50    let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
51    let future = multistream_select::listener_select_proto(conn, iter);
52    InboundUpgradeApply {
53        inner: InboundUpgradeApplyState::Init { future, upgrade: up }
54    }
55}
56
57/// Tries to perform an upgrade on an outbound connection or substream.
58pub fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
59where
60    C: AsyncRead + AsyncWrite + Unpin,
61    U: OutboundUpgrade<Negotiated<C>>
62{
63    let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
64    let future = multistream_select::dialer_select_proto(conn, iter, v);
65    OutboundUpgradeApply {
66        inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
67    }
68}
69
70/// Future returned by `apply_inbound`. Drives the upgrade process.
71pub struct InboundUpgradeApply<C, U>
72where
73    C: AsyncRead + AsyncWrite + Unpin,
74    U: InboundUpgrade<Negotiated<C>>
75{
76    inner: InboundUpgradeApplyState<C, U>
77}
78
79enum InboundUpgradeApplyState<C, U>
80where
81    C: AsyncRead + AsyncWrite + Unpin,
82    U: InboundUpgrade<Negotiated<C>>,
83{
84    Init {
85        future: ListenerSelectFuture<C, NameWrap<U::Info>>,
86        upgrade: U,
87    },
88    Upgrade {
89        future: Pin<Box<U::Future>>
90    },
91    Undefined
92}
93
94impl<C, U> Unpin for InboundUpgradeApply<C, U>
95where
96    C: AsyncRead + AsyncWrite + Unpin,
97    U: InboundUpgrade<Negotiated<C>>,
98{
99}
100
101impl<C, U> Future for InboundUpgradeApply<C, U>
102where
103    C: AsyncRead + AsyncWrite + Unpin,
104    U: InboundUpgrade<Negotiated<C>>,
105{
106    type Output = Result<U::Output, UpgradeError<U::Error>>;
107
108    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109        loop {
110            match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
111                InboundUpgradeApplyState::Init { mut future, upgrade } => {
112                    let (info, io) = match Future::poll(Pin::new(&mut future), cx)? {
113                        Poll::Ready(x) => x,
114                        Poll::Pending => {
115                            self.inner = InboundUpgradeApplyState::Init { future, upgrade };
116                            return Poll::Pending
117                        }
118                    };
119                    self.inner = InboundUpgradeApplyState::Upgrade {
120                        future: Box::pin(upgrade.upgrade_inbound(io, info.0))
121                    };
122                }
123                InboundUpgradeApplyState::Upgrade { mut future } => {
124                    match Future::poll(Pin::new(&mut future), cx) {
125                        Poll::Pending => {
126                            self.inner = InboundUpgradeApplyState::Upgrade { future };
127                            return Poll::Pending
128                        }
129                        Poll::Ready(Ok(x)) => {
130                            debug!("Successfully applied negotiated protocol");
131                            return Poll::Ready(Ok(x))
132                        }
133                        Poll::Ready(Err(e)) => {
134                            debug!("Failed to apply negotiated protocol");
135                            return Poll::Ready(Err(UpgradeError::Apply(e)))
136                        }
137                    }
138                }
139                InboundUpgradeApplyState::Undefined =>
140                    panic!("InboundUpgradeApplyState::poll called after completion")
141            }
142        }
143    }
144}
145
146/// Future returned by `apply_outbound`. Drives the upgrade process.
147pub struct OutboundUpgradeApply<C, U>
148where
149    C: AsyncRead + AsyncWrite + Unpin,
150    U: OutboundUpgrade<Negotiated<C>>
151{
152    inner: OutboundUpgradeApplyState<C, U>
153}
154
155enum OutboundUpgradeApplyState<C, U>
156where
157    C: AsyncRead + AsyncWrite + Unpin,
158    U: OutboundUpgrade<Negotiated<C>>
159{
160    Init {
161        future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
162        upgrade: U
163    },
164    Upgrade {
165        future: Pin<Box<U::Future>>
166    },
167    Undefined
168}
169
170impl<C, U> Unpin for OutboundUpgradeApply<C, U>
171where
172    C: AsyncRead + AsyncWrite + Unpin,
173    U: OutboundUpgrade<Negotiated<C>>,
174{
175}
176
177impl<C, U> Future for OutboundUpgradeApply<C, U>
178where
179    C: AsyncRead + AsyncWrite + Unpin,
180    U: OutboundUpgrade<Negotiated<C>>,
181{
182    type Output = Result<U::Output, UpgradeError<U::Error>>;
183
184    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185        loop {
186            match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) {
187                OutboundUpgradeApplyState::Init { mut future, upgrade } => {
188                    let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? {
189                        Poll::Ready(x) => x,
190                        Poll::Pending => {
191                            self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
192                            return Poll::Pending
193                        }
194                    };
195                    self.inner = OutboundUpgradeApplyState::Upgrade {
196                        future: Box::pin(upgrade.upgrade_outbound(connection, info.0))
197                    };
198                }
199                OutboundUpgradeApplyState::Upgrade { mut future } => {
200                    match Future::poll(Pin::new(&mut future), cx) {
201                        Poll::Pending => {
202                            self.inner = OutboundUpgradeApplyState::Upgrade { future };
203                            return Poll::Pending
204                        }
205                        Poll::Ready(Ok(x)) => {
206                            debug!("Successfully applied negotiated protocol");
207                            return Poll::Ready(Ok(x))
208                        }
209                        Poll::Ready(Err(e)) => {
210                            debug!("Failed to apply negotiated protocol");
211                            return Poll::Ready(Err(UpgradeError::Apply(e)));
212                        }
213                    }
214                }
215                OutboundUpgradeApplyState::Undefined =>
216                    panic!("OutboundUpgradeApplyState::poll called after completion")
217            }
218        }
219    }
220}
221
222type NameWrapIter<I> = iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;
223
224/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`.
225#[derive(Clone)]
226struct NameWrap<N>(N);
227
228impl<N: ProtocolName> AsRef<[u8]> for NameWrap<N> {
229    fn as_ref(&self) -> &[u8] {
230        self.0.protocol_name()
231    }
232}
233