use fnv::FnvHashMap;
use futures::{future, prelude::*, task::Context, task::Poll};
use multiaddr::Multiaddr;
use parking_lot::Mutex;
use std::{
fmt, io,
ops::Deref,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
};
pub use self::singleton::SingletonMuxer;
mod singleton;
pub trait StreamMuxer {
type Substream;
type OutboundSubstream;
type Error: Into<io::Error>;
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
fn open_outbound(&self) -> Self::OutboundSubstream;
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;
fn destroy_outbound(&self, s: Self::OutboundSubstream);
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>>;
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>>;
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
fn destroy_substream(&self, s: Self::Substream);
#[deprecated(note = "This method is unused and will be removed in the future")]
fn is_remote_acknowledged(&self) -> bool {
true
}
fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamMuxerEvent<T> {
InboundSubstream(T),
AddressChange(Multiaddr),
}
impl<T> StreamMuxerEvent<T> {
pub fn into_inbound_substream(self) -> Option<T> {
if let StreamMuxerEvent::InboundSubstream(s) = self {
Some(s)
} else {
None
}
}
}
pub fn event_from_ref_and_wrap<P>(
muxer: P,
) -> impl Future<Output = Result<StreamMuxerEvent<SubstreamRef<P>>, <P::Target as StreamMuxer>::Error>>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let muxer2 = muxer.clone();
future::poll_fn(move |cx| muxer.poll_event(cx)).map_ok(|event| match event {
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(substream_from_ref(muxer2, substream))
}
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
})
}
pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let inner = outbound_from_ref(muxer);
OutboundSubstreamRefWrapFuture { inner }
}
pub struct OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
inner: OutboundSubstreamRefFuture<P>,
}
impl<P> Future for OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
type Output = Result<SubstreamRef<P>, <P::Target as StreamMuxer>::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.inner), cx) {
Poll::Ready(Ok(substream)) => {
let out = substream_from_ref(self.inner.muxer.clone(), substream);
Poll::Ready(Ok(out))
}
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
}
}
}
pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
let outbound = muxer.open_outbound();
OutboundSubstreamRefFuture {
muxer,
outbound: Some(outbound),
}
}
pub struct OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
}
impl<P> Unpin for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
}
impl<P> Future for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
type Output = Result<<P::Target as StreamMuxer>::Substream, <P::Target as StreamMuxer>::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
this.muxer
.poll_outbound(cx, this.outbound.as_mut().expect("outbound was empty"))
}
}
impl<P> Drop for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn drop(&mut self) {
self.muxer
.destroy_outbound(self.outbound.take().expect("outbound was empty"))
}
}
pub fn substream_from_ref<P>(
muxer: P,
substream: <P::Target as StreamMuxer>::Substream,
) -> SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
SubstreamRef {
muxer,
substream: Some(substream),
shutdown_state: ShutdownState::Shutdown,
}
}
pub struct SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
substream: Option<<P::Target as StreamMuxer>::Substream>,
shutdown_state: ShutdownState,
}
enum ShutdownState {
Shutdown,
Flush,
Done,
}
impl<P> fmt::Debug for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
<P::Target as StreamMuxer>::Substream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Substream({:?})", self.substream)
}
}
impl<P> Unpin for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
}
impl<P> AsyncRead for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.read_substream(cx, s, buf).map_err(|e| e.into())
}
}
impl<P> AsyncWrite for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.write_substream(cx, s, buf).map_err(|e| e.into())
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
loop {
match this.shutdown_state {
ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Flush => match this.muxer.flush_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Done => {
return Poll::Ready(Ok(()));
}
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
let this = &mut *self;
let s = this.substream.as_mut().expect("substream was empty");
this.muxer.flush_substream(cx, s).map_err(|e| e.into())
}
}
impl<P> Drop for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
fn drop(&mut self) {
self.muxer
.destroy_substream(self.substream.take().expect("substream was empty"))
}
}
pub struct StreamMuxerBox {
inner: Box<
dyn StreamMuxer<Substream = usize, OutboundSubstream = usize, Error = io::Error>
+ Send
+ Sync,
>,
}
impl StreamMuxerBox {
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send,
{
let wrap = Wrap {
inner: muxer,
substreams: Mutex::new(Default::default()),
next_substream: AtomicUsize::new(0),
outbound: Mutex::new(Default::default()),
next_outbound: AtomicUsize::new(0),
};
StreamMuxerBox {
inner: Box::new(wrap),
}
}
}
impl StreamMuxer for StreamMuxerBox {
type Substream = usize; type OutboundSubstream = usize; type Error = io::Error;
#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
self.inner.poll_event(cx)
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
self.inner.open_outbound()
}
#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
self.inner.poll_outbound(cx, s)
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
self.inner.destroy_outbound(substream)
}
#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.read_substream(cx, s, buf)
}
#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
self.inner.write_substream(cx, s, buf)
}
#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.flush_substream(cx, s)
}
#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
self.inner.shutdown_substream(cx, s)
}
#[inline]
fn destroy_substream(&self, s: Self::Substream) {
self.inner.destroy_substream(s)
}
#[inline]
fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.close(cx)
}
#[inline]
fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.flush_all(cx)
}
}
struct Wrap<T>
where
T: StreamMuxer,
{
inner: T,
substreams: Mutex<FnvHashMap<usize, T::Substream>>,
next_substream: AtomicUsize,
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
next_outbound: AtomicUsize,
}
impl<T> StreamMuxer for Wrap<T>
where
T: StreamMuxer,
{
type Substream = usize; type OutboundSubstream = usize; type Error = io::Error;
#[inline]
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let substream = match self.inner.poll_event(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => {
return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a)))
}
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(id)))
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
let outbound = self.inner.open_outbound();
let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
self.outbound.lock().insert(id, outbound);
id
}
#[inline]
fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock();
let substream = match self
.inner
.poll_outbound(cx, list.get_mut(substream).unwrap())
{
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Poll::Ready(Ok(id))
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
let mut list = self.outbound.lock();
self.inner
.destroy_outbound(list.remove(&substream).unwrap())
}
#[inline]
fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.read_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}
#[inline]
fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.write_substream(cx, list.get_mut(s).unwrap(), buf)
.map_err(|e| e.into())
}
#[inline]
fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.flush_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}
#[inline]
fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
let mut list = self.substreams.lock();
self.inner
.shutdown_substream(cx, list.get_mut(s).unwrap())
.map_err(|e| e.into())
}
#[inline]
fn destroy_substream(&self, substream: Self::Substream) {
let mut list = self.substreams.lock();
self.inner
.destroy_substream(list.remove(&substream).unwrap())
}
#[inline]
fn close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.close(cx).map_err(|e| e.into())
}
#[inline]
fn flush_all(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.flush_all(cx).map_err(|e| e.into())
}
}