1pub use crate::upgrade::Version;
24
25use crate::{
26 ConnectedPoint,
27 Negotiated,
28 transport::{
29 Transport,
30 TransportError,
31 ListenerEvent,
32 and_then::AndThen,
33 boxed::boxed,
34 timeout::TransportTimeout,
35 },
36 muxing::{StreamMuxer, StreamMuxerBox},
37 upgrade::{
38 self,
39 OutboundUpgrade,
40 InboundUpgrade,
41 apply_inbound,
42 apply_outbound,
43 UpgradeError,
44 OutboundUpgradeApply,
45 InboundUpgradeApply
46 },
47 PeerId
48};
49use futures::{prelude::*, ready};
50use multiaddr::Multiaddr;
51use std::{
52 error::Error,
53 fmt,
54 pin::Pin,
55 task::{Context, Poll},
56 time::Duration
57};
58
59#[derive(Clone)]
81pub struct Builder<T> {
82 inner: T,
83 version: upgrade::Version,
84}
85
86impl<T> Builder<T>
87where
88 T: Transport,
89 T::Error: 'static,
90{
91 pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
93 Builder { inner, version }
94 }
95
96 pub fn authenticate<C, D, U, E>(self, upgrade: U) -> Authenticated<
109 AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
110 > where
111 T: Transport<Output = C>,
112 C: AsyncRead + AsyncWrite + Unpin,
113 D: AsyncRead + AsyncWrite + Unpin,
114 U: InboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E>,
115 U: OutboundUpgrade<Negotiated<C>, Output = (PeerId, D), Error = E> + Clone,
116 E: Error + 'static,
117 {
118 let version = self.version;
119 Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| {
120 Authenticate {
121 inner: upgrade::apply(conn, upgrade, endpoint, version)
122 }
123 }), version))
124 }
125}
126
127#[pin_project::pin_project]
132pub struct Authenticate<C, U>
133where
134 C: AsyncRead + AsyncWrite + Unpin,
135 U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
136{
137 #[pin]
138 inner: EitherUpgrade<C, U>
139}
140
141impl<C, U> Future for Authenticate<C, U>
142where
143 C: AsyncRead + AsyncWrite + Unpin,
144 U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>,
145 Output = <U as InboundUpgrade<Negotiated<C>>>::Output,
146 Error = <U as InboundUpgrade<Negotiated<C>>>::Error
147 >
148{
149 type Output = <EitherUpgrade<C, U> as Future>::Output;
150
151 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152 let this = self.project();
153 Future::poll(this.inner, cx)
154 }
155}
156
157#[pin_project::pin_project]
162pub struct Multiplex<C, U>
163where
164 C: AsyncRead + AsyncWrite + Unpin,
165 U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
166{
167 peer_id: Option<PeerId>,
168 #[pin]
169 upgrade: EitherUpgrade<C, U>,
170}
171
172impl<C, U, M, E> Future for Multiplex<C, U>
173where
174 C: AsyncRead + AsyncWrite + Unpin,
175 U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
176 U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E>
177{
178 type Output = Result<(PeerId, M), UpgradeError<E>>;
179
180 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 let this = self.project();
182 let m = match ready!(Future::poll(this.upgrade, cx)) {
183 Ok(m) => m,
184 Err(err) => return Poll::Ready(Err(err)),
185 };
186 let i = this.peer_id.take().expect("Multiplex future polled after completion.");
187 Poll::Ready(Ok((i, m)))
188 }
189}
190
191#[derive(Clone)]
193pub struct Authenticated<T>(Builder<T>);
194
195impl<T> Authenticated<T>
196where
197 T: Transport,
198 T::Error: 'static
199{
200 pub fn apply<C, D, U, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
211 where
212 T: Transport<Output = (PeerId, C)>,
213 C: AsyncRead + AsyncWrite + Unpin,
214 D: AsyncRead + AsyncWrite + Unpin,
215 U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
216 U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
217 E: Error + 'static,
218 {
219 Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version))
220 }
221
222 pub fn multiplex<C, M, U, E>(self, upgrade: U) -> Multiplexed<
233 AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
234 > where
235 T: Transport<Output = (PeerId, C)>,
236 C: AsyncRead + AsyncWrite + Unpin,
237 M: StreamMuxer,
238 U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
239 U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
240 E: Error + 'static,
241 {
242 let version = self.0.version;
243 Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
244 let upgrade = upgrade::apply(c, upgrade, endpoint, version);
245 Multiplex { peer_id: Some(i), upgrade }
246 }))
247 }
248
249 pub fn multiplex_ext<C, M, U, E, F>(self, up: F) -> Multiplexed<
261 AndThen<T, impl FnOnce((PeerId, C), ConnectedPoint) -> Multiplex<C, U> + Clone>
262 > where
263 T: Transport<Output = (PeerId, C)>,
264 C: AsyncRead + AsyncWrite + Unpin,
265 M: StreamMuxer,
266 U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
267 U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
268 E: Error + 'static,
269 F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone
270 {
271 let version = self.0.version;
272 Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| {
273 let upgrade = upgrade::apply(c, up(&peer_id, &endpoint), endpoint, version);
274 Multiplex { peer_id: Some(peer_id), upgrade }
275 }))
276 }
277}
278
279#[derive(Clone)]
282pub struct Multiplexed<T>(T);
283
284impl<T> Multiplexed<T> {
285 pub fn boxed<M>(self) -> super::Boxed<(PeerId, StreamMuxerBox)>
288 where
289 T: Transport<Output = (PeerId, M)> + Sized + Clone + Send + Sync + 'static,
290 T::Dial: Send + 'static,
291 T::Listener: Send + 'static,
292 T::ListenerUpgrade: Send + 'static,
293 T::Error: Send + Sync,
294 M: StreamMuxer + Send + Sync + 'static,
295 M::Substream: Send + 'static,
296 M::OutboundSubstream: Send + 'static
297 {
298 boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
299 }
300
301 pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
304 Multiplexed(TransportTimeout::new(self.0, timeout))
305 }
306
307 pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
310 Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
311 }
312
313 pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
316 Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
317 }
318}
319
320impl<T> Transport for Multiplexed<T>
321where
322 T: Transport,
323{
324 type Output = T::Output;
325 type Error = T::Error;
326 type Listener = T::Listener;
327 type ListenerUpgrade = T::ListenerUpgrade;
328 type Dial = T::Dial;
329
330 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
331 self.0.dial(addr)
332 }
333
334 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
335 self.0.listen_on(addr)
336 }
337
338 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
339 self.0.address_translation(server, observed)
340 }
341}
342
343type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
345
346#[derive(Debug, Copy, Clone)]
350pub struct Upgrade<T, U> { inner: T, upgrade: U }
351
352impl<T, U> Upgrade<T, U> {
353 pub fn new(inner: T, upgrade: U) -> Self {
354 Upgrade { inner, upgrade }
355 }
356}
357
358impl<T, C, D, U, E> Transport for Upgrade<T, U>
359where
360 T: Transport<Output = (PeerId, C)>,
361 T::Error: 'static,
362 C: AsyncRead + AsyncWrite + Unpin,
363 U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
364 U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
365 E: Error + 'static
366{
367 type Output = (PeerId, D);
368 type Error = TransportUpgradeError<T::Error, E>;
369 type Listener = ListenerStream<T::Listener, U>;
370 type ListenerUpgrade = ListenerUpgradeFuture<T::ListenerUpgrade, U, C>;
371 type Dial = DialUpgradeFuture<T::Dial, U, C>;
372
373 fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
374 let future = self.inner.dial(addr)
375 .map_err(|err| err.map(TransportUpgradeError::Transport))?;
376 Ok(DialUpgradeFuture {
377 future: Box::pin(future),
378 upgrade: future::Either::Left(Some(self.upgrade))
379 })
380 }
381
382 fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
383 let stream = self.inner.listen_on(addr)
384 .map_err(|err| err.map(TransportUpgradeError::Transport))?;
385 Ok(ListenerStream {
386 stream: Box::pin(stream),
387 upgrade: self.upgrade
388 })
389 }
390
391 fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
392 self.inner.address_translation(server, observed)
393 }
394}
395
396#[derive(Debug)]
398pub enum TransportUpgradeError<T, U> {
399 Transport(T),
401 Upgrade(UpgradeError<U>),
403}
404
405impl<T, U> fmt::Display for TransportUpgradeError<T, U>
406where
407 T: fmt::Display,
408 U: fmt::Display,
409{
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 match self {
412 TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e),
413 TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e),
414 }
415 }
416}
417
418impl<T, U> Error for TransportUpgradeError<T, U>
419where
420 T: Error + 'static,
421 U: Error + 'static,
422{
423 fn source(&self) -> Option<&(dyn Error + 'static)> {
424 match self {
425 TransportUpgradeError::Transport(e) => Some(e),
426 TransportUpgradeError::Upgrade(e) => Some(e),
427 }
428 }
429}
430
431pub struct DialUpgradeFuture<F, U, C>
433where
434 U: OutboundUpgrade<Negotiated<C>>,
435 C: AsyncRead + AsyncWrite + Unpin,
436{
437 future: Pin<Box<F>>,
438 upgrade: future::Either<Option<U>, (Option<PeerId>, OutboundUpgradeApply<C, U>)>
439}
440
441impl<F, U, C, D> Future for DialUpgradeFuture<F, U, C>
442where
443 F: TryFuture<Ok = (PeerId, C)>,
444 C: AsyncRead + AsyncWrite + Unpin,
445 U: OutboundUpgrade<Negotiated<C>, Output = D>,
446 U::Error: Error
447{
448 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
449
450 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
451 let this = &mut *self;
454
455 loop {
456 this.upgrade = match this.upgrade {
457 future::Either::Left(ref mut up) => {
458 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
459 Ok(v) => v,
460 Err(err) => return Poll::Ready(Err(err)),
461 };
462 let u = up.take().expect("DialUpgradeFuture is constructed with Either::Left(Some).");
463 future::Either::Right((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
464 }
465 future::Either::Right((ref mut i, ref mut up)) => {
466 let d = match ready!(Future::poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
467 Ok(d) => d,
468 Err(err) => return Poll::Ready(Err(err)),
469 };
470 let i = i.take().expect("DialUpgradeFuture polled after completion.");
471 return Poll::Ready(Ok((i, d)))
472 }
473 }
474 }
475 }
476}
477
478impl<F, U, C> Unpin for DialUpgradeFuture<F, U, C>
479where
480 U: OutboundUpgrade<Negotiated<C>>,
481 C: AsyncRead + AsyncWrite + Unpin,
482{
483}
484
485pub struct ListenerStream<S, U> {
487 stream: Pin<Box<S>>,
488 upgrade: U
489}
490
491impl<S, U, F, C, D, E> Stream for ListenerStream<S, U>
492where
493 S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
494 F: TryFuture<Ok = (PeerId, C)>,
495 C: AsyncRead + AsyncWrite + Unpin,
496 U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
497{
498 type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
499
500 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501 match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
502 Some(Ok(event)) => {
503 let event = event
504 .map(move |future| {
505 ListenerUpgradeFuture {
506 future: Box::pin(future),
507 upgrade: future::Either::Left(Some(self.upgrade.clone()))
508 }
509 })
510 .map_err(TransportUpgradeError::Transport);
511 Poll::Ready(Some(Ok(event)))
512 }
513 Some(Err(err)) => {
514 Poll::Ready(Some(Err(TransportUpgradeError::Transport(err))))
515 }
516 None => Poll::Ready(None)
517 }
518 }
519}
520
521impl<S, U> Unpin for ListenerStream<S, U> {
522}
523
524pub struct ListenerUpgradeFuture<F, U, C>
526where
527 C: AsyncRead + AsyncWrite + Unpin,
528 U: InboundUpgrade<Negotiated<C>>
529{
530 future: Pin<Box<F>>,
531 upgrade: future::Either<Option<U>, (Option<PeerId>, InboundUpgradeApply<C, U>)>
532}
533
534impl<F, U, C, D> Future for ListenerUpgradeFuture<F, U, C>
535where
536 F: TryFuture<Ok = (PeerId, C)>,
537 C: AsyncRead + AsyncWrite + Unpin,
538 U: InboundUpgrade<Negotiated<C>, Output = D>,
539 U::Error: Error
540{
541 type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;
542
543 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544 let this = &mut *self;
547
548 loop {
549 this.upgrade = match this.upgrade {
550 future::Either::Left(ref mut up) => {
551 let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
552 Ok(v) => v,
553 Err(err) => return Poll::Ready(Err(err))
554 };
555 let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::Left(Some).");
556 future::Either::Right((Some(i), apply_inbound(c, u)))
557 }
558 future::Either::Right((ref mut i, ref mut up)) => {
559 let d = match ready!(TryFuture::try_poll(Pin::new(up), cx).map_err(TransportUpgradeError::Upgrade)) {
560 Ok(v) => v,
561 Err(err) => return Poll::Ready(Err(err))
562 };
563 let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
564 return Poll::Ready(Ok((i, d)))
565 }
566 }
567 }
568 }
569}
570
571impl<F, U, C> Unpin for ListenerUpgradeFuture<F, U, C>
572where
573 C: AsyncRead + AsyncWrite + Unpin,
574 U: InboundUpgrade<Negotiated<C>>
575{
576}