1use crate::{ConnectedPoint, Negotiated};
22use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName};
23use futures::{future::Either, prelude::*};
24use log::debug;
25use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
26use std::{iter, mem, pin::Pin, task::Context, task::Poll};
27
28pub use multistream_select::Version;
29
30pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint, v: Version)
32 -> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
33where
34 C: AsyncRead + AsyncWrite + Unpin,
35 U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
36{
37 if cp.is_listener() {
38 Either::Left(apply_inbound(conn, up))
39 } else {
40 Either::Right(apply_outbound(conn, up, v))
41 }
42}
43
44pub fn apply_inbound<C, U>(conn: C, up: U) -> InboundUpgradeApply<C, U>
46where
47 C: AsyncRead + AsyncWrite + Unpin,
48 U: InboundUpgrade<Negotiated<C>>,
49{
50 let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
51 let future = multistream_select::listener_select_proto(conn, iter);
52 InboundUpgradeApply {
53 inner: InboundUpgradeApplyState::Init { future, upgrade: up }
54 }
55}
56
57pub fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
59where
60 C: AsyncRead + AsyncWrite + Unpin,
61 U: OutboundUpgrade<Negotiated<C>>
62{
63 let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
64 let future = multistream_select::dialer_select_proto(conn, iter, v);
65 OutboundUpgradeApply {
66 inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
67 }
68}
69
70pub struct InboundUpgradeApply<C, U>
72where
73 C: AsyncRead + AsyncWrite + Unpin,
74 U: InboundUpgrade<Negotiated<C>>
75{
76 inner: InboundUpgradeApplyState<C, U>
77}
78
79enum InboundUpgradeApplyState<C, U>
80where
81 C: AsyncRead + AsyncWrite + Unpin,
82 U: InboundUpgrade<Negotiated<C>>,
83{
84 Init {
85 future: ListenerSelectFuture<C, NameWrap<U::Info>>,
86 upgrade: U,
87 },
88 Upgrade {
89 future: Pin<Box<U::Future>>
90 },
91 Undefined
92}
93
94impl<C, U> Unpin for InboundUpgradeApply<C, U>
95where
96 C: AsyncRead + AsyncWrite + Unpin,
97 U: InboundUpgrade<Negotiated<C>>,
98{
99}
100
101impl<C, U> Future for InboundUpgradeApply<C, U>
102where
103 C: AsyncRead + AsyncWrite + Unpin,
104 U: InboundUpgrade<Negotiated<C>>,
105{
106 type Output = Result<U::Output, UpgradeError<U::Error>>;
107
108 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109 loop {
110 match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
111 InboundUpgradeApplyState::Init { mut future, upgrade } => {
112 let (info, io) = match Future::poll(Pin::new(&mut future), cx)? {
113 Poll::Ready(x) => x,
114 Poll::Pending => {
115 self.inner = InboundUpgradeApplyState::Init { future, upgrade };
116 return Poll::Pending
117 }
118 };
119 self.inner = InboundUpgradeApplyState::Upgrade {
120 future: Box::pin(upgrade.upgrade_inbound(io, info.0))
121 };
122 }
123 InboundUpgradeApplyState::Upgrade { mut future } => {
124 match Future::poll(Pin::new(&mut future), cx) {
125 Poll::Pending => {
126 self.inner = InboundUpgradeApplyState::Upgrade { future };
127 return Poll::Pending
128 }
129 Poll::Ready(Ok(x)) => {
130 debug!("Successfully applied negotiated protocol");
131 return Poll::Ready(Ok(x))
132 }
133 Poll::Ready(Err(e)) => {
134 debug!("Failed to apply negotiated protocol");
135 return Poll::Ready(Err(UpgradeError::Apply(e)))
136 }
137 }
138 }
139 InboundUpgradeApplyState::Undefined =>
140 panic!("InboundUpgradeApplyState::poll called after completion")
141 }
142 }
143 }
144}
145
146pub struct OutboundUpgradeApply<C, U>
148where
149 C: AsyncRead + AsyncWrite + Unpin,
150 U: OutboundUpgrade<Negotiated<C>>
151{
152 inner: OutboundUpgradeApplyState<C, U>
153}
154
155enum OutboundUpgradeApplyState<C, U>
156where
157 C: AsyncRead + AsyncWrite + Unpin,
158 U: OutboundUpgrade<Negotiated<C>>
159{
160 Init {
161 future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
162 upgrade: U
163 },
164 Upgrade {
165 future: Pin<Box<U::Future>>
166 },
167 Undefined
168}
169
170impl<C, U> Unpin for OutboundUpgradeApply<C, U>
171where
172 C: AsyncRead + AsyncWrite + Unpin,
173 U: OutboundUpgrade<Negotiated<C>>,
174{
175}
176
177impl<C, U> Future for OutboundUpgradeApply<C, U>
178where
179 C: AsyncRead + AsyncWrite + Unpin,
180 U: OutboundUpgrade<Negotiated<C>>,
181{
182 type Output = Result<U::Output, UpgradeError<U::Error>>;
183
184 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185 loop {
186 match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) {
187 OutboundUpgradeApplyState::Init { mut future, upgrade } => {
188 let (info, connection) = match Future::poll(Pin::new(&mut future), cx)? {
189 Poll::Ready(x) => x,
190 Poll::Pending => {
191 self.inner = OutboundUpgradeApplyState::Init { future, upgrade };
192 return Poll::Pending
193 }
194 };
195 self.inner = OutboundUpgradeApplyState::Upgrade {
196 future: Box::pin(upgrade.upgrade_outbound(connection, info.0))
197 };
198 }
199 OutboundUpgradeApplyState::Upgrade { mut future } => {
200 match Future::poll(Pin::new(&mut future), cx) {
201 Poll::Pending => {
202 self.inner = OutboundUpgradeApplyState::Upgrade { future };
203 return Poll::Pending
204 }
205 Poll::Ready(Ok(x)) => {
206 debug!("Successfully applied negotiated protocol");
207 return Poll::Ready(Ok(x))
208 }
209 Poll::Ready(Err(e)) => {
210 debug!("Failed to apply negotiated protocol");
211 return Poll::Ready(Err(UpgradeError::Apply(e)));
212 }
213 }
214 }
215 OutboundUpgradeApplyState::Undefined =>
216 panic!("OutboundUpgradeApplyState::poll called after completion")
217 }
218 }
219 }
220}
221
222type NameWrapIter<I> = iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;
223
224#[derive(Clone)]
226struct NameWrap<N>(N);
227
228impl<N: ProtocolName> AsRef<[u8]> for NameWrap<N> {
229 fn as_ref(&self) -> &[u8] {
230 self.0.protocol_name()
231 }
232}
233