1use fnv::FnvHashMap;
55use futures::{future, prelude::*, task::Context, task::Poll};
56use multiaddr::Multiaddr;
57use parking_lot::Mutex;
58use std::{io, ops::Deref, fmt, pin::Pin, sync::atomic::{AtomicUsize, Ordering}};
59
60pub use self::singleton::SingletonMuxer;
61
62mod singleton;
63
64pub trait StreamMuxer {
77 type Substream;
79
80 type OutboundSubstream;
82
83 type Error: Into<io::Error>;
85
86 fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
99
100 fn open_outbound(&self) -> Self::OutboundSubstream;
106
107 fn poll_outbound(&self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream)
117 -> Poll<Result<Self::Substream, Self::Error>>;
118
119 fn destroy_outbound(&self, s: Self::OutboundSubstream);
122
123 fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8])
135 -> Poll<Result<usize, Self::Error>>;
136
137 fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8])
149 -> Poll<Result<usize, Self::Error>>;
150
151 fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream)
162 -> Poll<Result<(), Self::Error>>;
163
164 fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream)
176 -> Poll<Result<(), Self::Error>>;
177
178 fn destroy_substream(&self, s: Self::Substream);
180
181 #[deprecated(note = "This method is unused and will be removed in the future")]
188 fn is_remote_acknowledged(&self) -> bool {
189 true
190 }
191
192 fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
205
206 fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum StreamMuxerEvent<T> {
217 InboundSubstream(T),
219
220 AddressChange(Multiaddr),
225}
226
227impl<T> StreamMuxerEvent<T> {
228 pub fn into_inbound_substream(self) -> Option<T> {
231 if let StreamMuxerEvent::InboundSubstream(s) = self {
232 Some(s)
233 } else {
234 None
235 }
236 }
237}
238
239pub fn event_from_ref_and_wrap<P>(
242 muxer: P,
243) -> impl Future<Output = Result<StreamMuxerEvent<SubstreamRef<P>>, <P::Target as StreamMuxer>::Error>>
244where
245 P: Deref + Clone,
246 P::Target: StreamMuxer,
247{
248 let muxer2 = muxer.clone();
249 future::poll_fn(move |cx| muxer.poll_event(cx))
250 .map_ok(|event| {
251 match event {
252 StreamMuxerEvent::InboundSubstream(substream) =>
253 StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream)),
254 StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
255 }
256 })
257}
258
259pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
262where
263 P: Deref + Clone,
264 P::Target: StreamMuxer,
265{
266 let inner = outbound_from_ref(muxer);
267 OutboundSubstreamRefWrapFuture { inner }
268}
269
270pub struct OutboundSubstreamRefWrapFuture<P>
272where
273 P: Deref + Clone,
274 P::Target: StreamMuxer,
275{
276 inner: OutboundSubstreamRefFuture<P>,
277}
278
279impl<P> Future for OutboundSubstreamRefWrapFuture<P>
280where
281 P: Deref + Clone,
282 P::Target: StreamMuxer,
283{
284 type Output = Result<SubstreamRef<P>, <P::Target as StreamMuxer>::Error>;
285
286 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287 match Future::poll(Pin::new(&mut self.inner), cx) {
288 Poll::Ready(Ok(substream)) => {
289 let out = substream_from_ref(self.inner.muxer.clone(), substream);
290 Poll::Ready(Ok(out))
291 }
292 Poll::Pending => Poll::Pending,
293 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
294 }
295 }
296}
297
298pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
300where
301 P: Deref,
302 P::Target: StreamMuxer,
303{
304 let outbound = muxer.open_outbound();
305 OutboundSubstreamRefFuture {
306 muxer,
307 outbound: Some(outbound),
308 }
309}
310
311pub struct OutboundSubstreamRefFuture<P>
313where
314 P: Deref,
315 P::Target: StreamMuxer,
316{
317 muxer: P,
318 outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
319}
320
321impl<P> Unpin for OutboundSubstreamRefFuture<P>
322where
323 P: Deref,
324 P::Target: StreamMuxer,
325{
326}
327
328impl<P> Future for OutboundSubstreamRefFuture<P>
329where
330 P: Deref,
331 P::Target: StreamMuxer,
332{
333 type Output = Result<<P::Target as StreamMuxer>::Substream, <P::Target as StreamMuxer>::Error>;
334
335 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
336 let this = &mut *self;
339 this.muxer.poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty"))
340 }
341}
342
343impl<P> Drop for OutboundSubstreamRefFuture<P>
344where
345 P: Deref,
346 P::Target: StreamMuxer,
347{
348 fn drop(&mut self) {
349 self.muxer
350 .destroy_outbound(self.outbound.take().expect("outbound was empty"))
351 }
352}
353
354pub fn substream_from_ref<P>(
357 muxer: P,
358 substream: <P::Target as StreamMuxer>::Substream,
359) -> SubstreamRef<P>
360where
361 P: Deref,
362 P::Target: StreamMuxer,
363{
364 SubstreamRef {
365 muxer,
366 substream: Some(substream),
367 shutdown_state: ShutdownState::Shutdown,
368 }
369}
370
371pub struct SubstreamRef<P>
373where
374 P: Deref,
375 P::Target: StreamMuxer,
376{
377 muxer: P,
378 substream: Option<<P::Target as StreamMuxer>::Substream>,
379 shutdown_state: ShutdownState,
380}
381
382enum ShutdownState {
383 Shutdown,
384 Flush,
385 Done,
386}
387
388impl<P> fmt::Debug for SubstreamRef<P>
389where
390 P: Deref,
391 P::Target: StreamMuxer,
392 <P::Target as StreamMuxer>::Substream: fmt::Debug,
393{
394 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
395 write!(f, "Substream({:?})", self.substream)
396 }
397}
398
399impl<P> Unpin for SubstreamRef<P>
400where
401 P: Deref,
402 P::Target: StreamMuxer,
403{
404}
405
406impl<P> AsyncRead for SubstreamRef<P>
407where
408 P: Deref,
409 P::Target: StreamMuxer,
410{
411 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
412 let this = &mut *self;
415
416 let s = this.substream.as_mut().expect("substream was empty");
417 this.muxer.read_substream(cx, s, buf).map_err(|e| e.into())
418 }
419}
420
421impl<P> AsyncWrite for SubstreamRef<P>
422where
423 P: Deref,
424 P::Target: StreamMuxer,
425{
426 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
427 let this = &mut *self;
430
431 let s = this.substream.as_mut().expect("substream was empty");
432 this.muxer.write_substream(cx, s, buf).map_err(|e| e.into())
433 }
434
435 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
436 let this = &mut *self;
439
440 let s = this.substream.as_mut().expect("substream was empty");
441 loop {
442 match this.shutdown_state {
443 ShutdownState::Shutdown => {
444 match this.muxer.shutdown_substream(cx, s) {
445 Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush,
446 Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
447 Poll::Pending => return Poll::Pending,
448 }
449 }
450 ShutdownState::Flush => {
451 match this.muxer.flush_substream(cx, s) {
452 Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done,
453 Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
454 Poll::Pending => return Poll::Pending,
455 }
456 }
457 ShutdownState::Done => {
458 return Poll::Ready(Ok(()));
459 }
460 }
461 }
462 }
463
464 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
465 let this = &mut *self;
468
469 let s = this.substream.as_mut().expect("substream was empty");
470 this.muxer.flush_substream(cx, s).map_err(|e| e.into())
471 }
472}
473
474impl<P> Drop for SubstreamRef<P>
475where
476 P: Deref,
477 P::Target: StreamMuxer,
478{
479 fn drop(&mut self) {
480 self.muxer.destroy_substream(self.substream.take().expect("substream was empty"))
481 }
482}
483
484pub struct StreamMuxerBox {
486 inner: Box<dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error> + Send + Sync>,
487}
488
489impl StreamMuxerBox {
490 pub fn new<T>(muxer: T) -> StreamMuxerBox
492 where
493 T: StreamMuxer + Send + Sync + 'static,
494 T::OutboundSubstream: Send,
495 T::Substream: Send,
496 {
497 let wrap = Wrap {
498 inner: muxer,
499 substreams: Mutex::new(Default::default()),
500 next_substream: AtomicUsize::new(0),
501 outbound: Mutex::new(Default::default()),
502 next_outbound: AtomicUsize::new(0),
503 };
504
505 StreamMuxerBox {
506 inner: Box::new(wrap),
507 }
508 }
509}
510
511impl StreamMuxer for StreamMuxerBox {
512 type Substream = usize; type OutboundSubstream = usize; type Error = io::Error;
515
516 #[inline]
517 fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
518 self.inner.poll_event(cx)
519 }
520
521 #[inline]
522 fn open_outbound(&self) -> Self::OutboundSubstream {
523 self.inner.open_outbound()
524 }
525
526 #[inline]
527 fn poll_outbound(&self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, Self::Error>> {
528 self.inner.poll_outbound(cx, s)
529 }
530
531 #[inline]
532 fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
533 self.inner.destroy_outbound(substream)
534 }
535
536 #[inline]
537 fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
538 self.inner.read_substream(cx, s, buf)
539 }
540
541 #[inline]
542 fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
543 self.inner.write_substream(cx, s, buf)
544 }
545
546 #[inline]
547 fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
548 self.inner.flush_substream(cx, s)
549 }
550
551 #[inline]
552 fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
553 self.inner.shutdown_substream(cx, s)
554 }
555
556 #[inline]
557 fn destroy_substream(&self, s: Self::Substream) {
558 self.inner.destroy_substream(s)
559 }
560
561 #[inline]
562 fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
563 self.inner.close(cx)
564 }
565
566 #[inline]
567 fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
568 self.inner.flush_all(cx)
569 }
570}
571
572struct Wrap<T> where T: StreamMuxer {
573 inner: T,
574 substreams: Mutex<FnvHashMap<usize, T::Substream>>,
575 next_substream: AtomicUsize,
576 outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
577 next_outbound: AtomicUsize,
578}
579
580impl<T> StreamMuxer for Wrap<T>
581where
582 T: StreamMuxer,
583{
584 type Substream = usize; type OutboundSubstream = usize; type Error = io::Error;
587
588 #[inline]
589 fn poll_event(&self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
590 let substream = match self.inner.poll_event(cx) {
591 Poll::Pending => return Poll::Pending,
592 Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) =>
593 return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))),
594 Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
595 Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
596 };
597
598 let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
599 self.substreams.lock().insert(id, substream);
600 Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id)))
601 }
602
603 #[inline]
604 fn open_outbound(&self) -> Self::OutboundSubstream {
605 let outbound = self.inner.open_outbound();
606 let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
607 self.outbound.lock().insert(id, outbound);
608 id
609 }
610
611 #[inline]
612 fn poll_outbound(
613 &self,
614 cx: &mut Context<'_>,
615 substream: &mut Self::OutboundSubstream,
616 ) -> Poll<Result<Self::Substream, Self::Error>> {
617 let mut list = self.outbound.lock();
618 let substream = match self.inner.poll_outbound(cx, list.get_mut(substream).unwrap()) {
619 Poll::Pending => return Poll::Pending,
620 Poll::Ready(Ok(s)) => s,
621 Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
622 };
623 let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
624 self.substreams.lock().insert(id, substream);
625 Poll::Ready(Ok(id))
626 }
627
628 #[inline]
629 fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
630 let mut list = self.outbound.lock();
631 self.inner.destroy_outbound(list.remove(&substream).unwrap())
632 }
633
634 #[inline]
635 fn read_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, Self::Error>> {
636 let mut list = self.substreams.lock();
637 self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
638 }
639
640 #[inline]
641 fn write_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, Self::Error>> {
642 let mut list = self.substreams.lock();
643 self.inner.write_substream(cx, list.get_mut(s).unwrap(), buf).map_err(|e| e.into())
644 }
645
646 #[inline]
647 fn flush_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
648 let mut list = self.substreams.lock();
649 self.inner.flush_substream(cx, list.get_mut(s).unwrap()).map_err(|e| e.into())
650 }
651
652 #[inline]
653 fn shutdown_substream(&self, cx: &mut Context<'_>, s: &mut Self::Substream) -> Poll<Result<(), Self::Error>> {
654 let mut list = self.substreams.lock();
655 self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()).map_err(|e| e.into())
656 }
657
658 #[inline]
659 fn destroy_substream(&self, substream: Self::Substream) {
660 let mut list = self.substreams.lock();
661 self.inner.destroy_substream(list.remove(&substream).unwrap())
662 }
663
664 #[inline]
665 fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
666 self.inner.close(cx).map_err(|e| e.into())
667 }
668
669 #[inline]
670 fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
671 self.inner.flush_all(cx).map_err(|e| e.into())
672 }
673}