use fnv::FnvHashMap;
use futures::{future, prelude::*, try_ready};
use parking_lot::Mutex;
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write};
use std::ops::Deref;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Shutdown {
Inbound,
Outbound,
All
}
pub trait StreamMuxer {
type Substream;
type OutboundSubstream;
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError>;
fn open_outbound(&self) -> Self::OutboundSubstream;
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError>;
fn destroy_outbound(&self, s: Self::OutboundSubstream);
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError>;
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError>;
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError>;
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError>;
fn destroy_substream(&self, s: Self::Substream);
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError>;
fn flush_all(&self) -> Poll<(), IoError>;
}
#[inline]
pub fn inbound_from_ref_and_wrap<P>(
muxer: P,
) -> impl Future<Item = Option<SubstreamRef<P>>, Error = IoError>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let muxer2 = muxer.clone();
future::poll_fn(move || muxer.poll_inbound())
.map(|substream| substream.map(move |s| substream_from_ref(muxer2, s)))
}
#[inline]
pub fn outbound_from_ref_and_wrap<P>(muxer: P) -> OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
let inner = outbound_from_ref(muxer);
OutboundSubstreamRefWrapFuture { inner }
}
pub struct OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
inner: OutboundSubstreamRefFuture<P>,
}
impl<P> Future for OutboundSubstreamRefWrapFuture<P>
where
P: Deref + Clone,
P::Target: StreamMuxer,
{
type Item = Option<SubstreamRef<P>>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(Some(substream))) => {
let out = substream_from_ref(self.inner.muxer.clone(), substream);
Ok(Async::Ready(Some(out)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
#[inline]
pub fn outbound_from_ref<P>(muxer: P) -> OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
let outbound = muxer.open_outbound();
OutboundSubstreamRefFuture {
muxer,
outbound: Some(outbound),
}
}
pub struct OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
outbound: Option<<P::Target as StreamMuxer>::OutboundSubstream>,
}
impl<P> Future for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
type Item = Option<<P::Target as StreamMuxer>::Substream>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.muxer
.poll_outbound(self.outbound.as_mut().expect("outbound was empty"))
}
}
impl<P> Drop for OutboundSubstreamRefFuture<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn drop(&mut self) {
self.muxer
.destroy_outbound(self.outbound.take().expect("outbound was empty"))
}
}
#[inline]
pub fn substream_from_ref<P>(
muxer: P,
substream: <P::Target as StreamMuxer>::Substream,
) -> SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
SubstreamRef {
muxer,
substream: Some(substream),
}
}
pub struct SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
muxer: P,
substream: Option<<P::Target as StreamMuxer>::Substream>,
}
impl<P> fmt::Debug for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
<P::Target as StreamMuxer>::Substream: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Substream({:?})", self.substream)
}
}
impl<P> Read for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.read_substream(s, buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
}
impl<P> AsyncRead for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.read_substream(s, buf)
}
}
impl<P> Write for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.write_substream(s, buf)? {
Async::Ready(n) => Ok(n),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
let s = self.substream.as_mut().expect("substream was empty");
match self.muxer.flush_substream(s)? {
Async::Ready(()) => Ok(()),
Async::NotReady => Err(IoErrorKind::WouldBlock.into())
}
}
}
impl<P> AsyncWrite for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.write_substream(s, buf)
}
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.shutdown_substream(s, Shutdown::All)?;
Ok(Async::Ready(()))
}
#[inline]
fn poll_flush(&mut self) -> Poll<(), IoError> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.flush_substream(s)
}
}
impl<P> Drop for SubstreamRef<P>
where
P: Deref,
P::Target: StreamMuxer,
{
#[inline]
fn drop(&mut self) {
self.muxer.destroy_substream(self.substream.take().expect("substream was empty"))
}
}
pub struct StreamMuxerBox {
inner: Box<StreamMuxer<Substream = usize, OutboundSubstream = usize> + Send + Sync>,
}
impl StreamMuxerBox {
pub fn new<T>(muxer: T) -> StreamMuxerBox
where
T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send,
T::Substream: Send,
{
let wrap = Wrap {
inner: muxer,
substreams: Mutex::new(Default::default()),
next_substream: AtomicUsize::new(0),
outbound: Mutex::new(Default::default()),
next_outbound: AtomicUsize::new(0),
};
StreamMuxerBox {
inner: Box::new(wrap),
}
}
}
impl StreamMuxer for StreamMuxerBox {
type Substream = usize;
type OutboundSubstream = usize;
#[inline]
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
self.inner.poll_inbound()
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
self.inner.open_outbound()
}
#[inline]
fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll<Option<Self::Substream>, IoError> {
self.inner.poll_outbound(s)
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
self.inner.destroy_outbound(substream)
}
#[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
self.inner.read_substream(s, buf)
}
#[inline]
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
self.inner.write_substream(s, buf)
}
#[inline]
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> {
self.inner.flush_substream(s)
}
#[inline]
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown_substream(s, kind)
}
#[inline]
fn destroy_substream(&self, s: Self::Substream) {
self.inner.destroy_substream(s)
}
#[inline]
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown(kind)
}
#[inline]
fn flush_all(&self) -> Poll<(), IoError> {
self.inner.flush_all()
}
}
struct Wrap<T> where T: StreamMuxer {
inner: T,
substreams: Mutex<FnvHashMap<usize, T::Substream>>,
next_substream: AtomicUsize,
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
next_outbound: AtomicUsize,
}
impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
type Substream = usize;
type OutboundSubstream = usize;
#[inline]
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
match try_ready!(self.inner.poll_inbound()) {
Some(substream) => {
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Ok(Async::Ready(Some(id)))
},
None => Ok(Async::Ready(None)),
}
}
#[inline]
fn open_outbound(&self) -> Self::OutboundSubstream {
let outbound = self.inner.open_outbound();
let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
self.outbound.lock().insert(id, outbound);
id
}
#[inline]
fn poll_outbound(
&self,
substream: &mut Self::OutboundSubstream,
) -> Poll<Option<Self::Substream>, IoError> {
let mut list = self.outbound.lock();
match try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap())) {
Some(substream) => {
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
self.substreams.lock().insert(id, substream);
Ok(Async::Ready(Some(id)))
},
None => Ok(Async::Ready(None)),
}
}
#[inline]
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
let mut list = self.outbound.lock();
self.inner.destroy_outbound(list.remove(&substream).unwrap())
}
#[inline]
fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, IoError> {
let mut list = self.substreams.lock();
self.inner.read_substream(list.get_mut(s).unwrap(), buf)
}
#[inline]
fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll<usize, IoError> {
let mut list = self.substreams.lock();
self.inner.write_substream(list.get_mut(s).unwrap(), buf)
}
#[inline]
fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> {
let mut list = self.substreams.lock();
self.inner.flush_substream(list.get_mut(s).unwrap())
}
#[inline]
fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> {
let mut list = self.substreams.lock();
self.inner.shutdown_substream(list.get_mut(s).unwrap(), kind)
}
#[inline]
fn destroy_substream(&self, substream: Self::Substream) {
let mut list = self.substreams.lock();
self.inner.destroy_substream(list.remove(&substream).unwrap())
}
#[inline]
fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> {
self.inner.shutdown(kind)
}
#[inline]
fn flush_all(&self) -> Poll<(), IoError> {
self.inner.flush_all()
}
}