use crate::muxing::{StreamMuxer, StreamMuxerEvent, SubstreamRef, substream_from_ref};
use futures::prelude::*;
use multiaddr::Multiaddr;
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SubstreamEndpoint<TDialInfo> {
Dialer(TDialInfo),
Listener,
}
impl<TDialInfo> SubstreamEndpoint<TDialInfo> {
pub fn is_dialer(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => true,
SubstreamEndpoint::Listener => false,
}
}
pub fn is_listener(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => false,
SubstreamEndpoint::Listener => true,
}
}
}
pub struct Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
inner: Arc<TMuxer>,
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
}
pub struct Close<TMuxer> {
muxer: Arc<TMuxer>,
}
pub type Substream<TMuxer> = SubstreamRef<Arc<TMuxer>>;
pub enum SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
InboundSubstream {
substream: Substream<TMuxer>,
},
OutboundSubstream {
user_data: TUserData,
substream: Substream<TMuxer>,
},
AddressChange(Multiaddr),
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct OutboundSubstreamId(usize);
impl<TMuxer, TUserData> Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
pub fn new(muxer: TMuxer) -> Self {
Muxing {
inner: Arc::new(muxer),
outbound_substreams: SmallVec::new(),
}
}
pub fn open_substream(&mut self, user_data: TUserData) {
let raw = self.inner.open_outbound();
self.outbound_substreams.push((user_data, raw));
}
#[must_use]
pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
let substreams = self.cancel_outgoing();
let close = Close { muxer: self.inner.clone() };
(close, substreams)
}
pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
let mut out = Vec::with_capacity(self.outbound_substreams.len());
for (user_data, outbound) in self.outbound_substreams.drain(..) {
out.push(user_data);
self.inner.destroy_outbound(outbound);
}
out
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
let substream = substream_from_ref(self.inner.clone(), substream);
return Poll::Ready(Ok(SubstreamEvent::InboundSubstream {
substream,
}));
}
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) =>
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => {}
}
for n in (0..self.outbound_substreams.len()).rev() {
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
match self.inner.poll_outbound(cx, &mut outbound) {
Poll::Ready(Ok(substream)) => {
let substream = substream_from_ref(self.inner.clone(), substream);
self.inner.destroy_outbound(outbound);
return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
}));
}
Poll::Pending => {
self.outbound_substreams.push((user_data, outbound));
}
Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into()));
}
}
}
Poll::Pending
}
}
impl<TMuxer, TUserData> fmt::Debug for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Muxing")
.field("outbound_substreams", &self.outbound_substreams.len())
.finish()
}
}
impl<TMuxer, TUserData> Drop for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn drop(&mut self) {
for (_, outbound) in self.outbound_substreams.drain(..) {
self.inner.destroy_outbound(outbound);
}
}
}
impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
type Output = Result<(), IoError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.muxer.close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
}
}
}
impl<TMuxer> fmt::Debug for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Close")
.finish()
}
}
impl<TMuxer, TUserData> fmt::Debug for SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
TMuxer::Substream: fmt::Debug,
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubstreamEvent::InboundSubstream { substream } => {
f.debug_struct("SubstreamEvent::OutboundClosed")
.field("substream", substream)
.finish()
},
SubstreamEvent::OutboundSubstream { user_data, substream } => {
f.debug_struct("SubstreamEvent::OutboundSubstream")
.field("user_data", user_data)
.field("substream", substream)
.finish()
},
SubstreamEvent::AddressChange(address) => {
f.debug_struct("SubstreamEvent::AddressChange")
.field("address", address)
.finish()
},
}
}
}