use futures::prelude::*;
use crate::muxing;
use smallvec::SmallVec;
use std::fmt;
use std::io::Error as IoError;
use std::sync::Arc;
pub struct NodeStream<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
{
muxer: Arc<TMuxer>,
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
}
pub struct Close<TMuxer> {
muxer: Arc<TMuxer>,
}
pub type Substream<TMuxer> = muxing::SubstreamRef<Arc<TMuxer>>;
pub enum NodeEvent<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
{
InboundSubstream {
substream: Substream<TMuxer>,
},
OutboundSubstream {
user_data: TUserData,
substream: Substream<TMuxer>,
},
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct OutboundSubstreamId(usize);
impl<TMuxer, TUserData> NodeStream<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
{
#[inline]
pub fn new(muxer: TMuxer) -> Self {
NodeStream {
muxer: Arc::new(muxer),
outbound_substreams: SmallVec::new(),
}
}
pub fn open_substream(&mut self, user_data: TUserData) {
let raw = self.muxer.open_outbound();
self.outbound_substreams.push((user_data, raw));
}
pub fn is_remote_acknowledged(&self) -> bool {
self.muxer.is_remote_acknowledged()
}
#[must_use]
pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
let substreams = self.cancel_outgoing();
let close = Close { muxer: self.muxer.clone() };
(close, substreams)
}
pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
let mut out = Vec::with_capacity(self.outbound_substreams.len());
for (user_data, outbound) in self.outbound_substreams.drain() {
out.push(user_data);
self.muxer.destroy_outbound(outbound);
}
out
}
pub fn poll(&mut self) -> Poll<NodeEvent<TMuxer, TUserData>, IoError> {
match self.muxer.poll_inbound().map_err(|e| e.into())? {
Async::Ready(substream) => {
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
return Ok(Async::Ready(NodeEvent::InboundSubstream {
substream,
}));
}
Async::NotReady => {}
}
for n in (0..self.outbound_substreams.len()).rev() {
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
match self.muxer.poll_outbound(&mut outbound) {
Ok(Async::Ready(substream)) => {
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
self.muxer.destroy_outbound(outbound);
return Ok(Async::Ready(NodeEvent::OutboundSubstream {
user_data,
substream,
}));
}
Ok(Async::NotReady) => {
self.outbound_substreams.push((user_data, outbound));
}
Err(err) => {
self.muxer.destroy_outbound(outbound);
return Err(err.into());
}
}
}
Ok(Async::NotReady)
}
}
impl<TMuxer, TUserData> fmt::Debug for NodeStream<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("NodeStream")
.field("outbound_substreams", &self.outbound_substreams.len())
.finish()
}
}
impl<TMuxer, TUserData> Drop for NodeStream<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
{
fn drop(&mut self) {
for (_, outbound) in self.outbound_substreams.drain() {
self.muxer.destroy_outbound(outbound);
}
}
}
impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: muxing::StreamMuxer,
{
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.muxer.close().map_err(|e| e.into())
}
}
impl<TMuxer> fmt::Debug for Close<TMuxer>
where
TMuxer: muxing::StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Close")
.finish()
}
}
impl<TMuxer, TUserData> fmt::Debug for NodeEvent<TMuxer, TUserData>
where
TMuxer: muxing::StreamMuxer,
TMuxer::Substream: fmt::Debug,
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NodeEvent::InboundSubstream { substream } => {
f.debug_struct("NodeEvent::OutboundClosed")
.field("substream", substream)
.finish()
},
NodeEvent::OutboundSubstream { user_data, substream } => {
f.debug_struct("NodeEvent::OutboundSubstream")
.field("user_data", user_data)
.field("substream", substream)
.finish()
},
}
}
}
#[cfg(test)]
mod node_stream {
use super::{NodeEvent, NodeStream};
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use assert_matches::assert_matches;
use futures::prelude::*;
use tokio_mock_task::MockTask;
fn build_node_stream() -> NodeStream<DummyMuxer, Vec<u8>> {
let muxer = DummyMuxer::new();
NodeStream::<_, Vec<u8>>::new(muxer)
}
#[test]
fn closing_a_node_stream_destroys_substreams_and_returns_submitted_user_data() {
let mut ns = build_node_stream();
ns.open_substream(vec![2]);
ns.open_substream(vec![3]);
ns.open_substream(vec![5]);
let user_data_submitted = ns.close();
assert_eq!(user_data_submitted.1, vec![
vec![2], vec![3], vec![5]
]);
}
#[test]
fn poll_returns_not_ready_when_there_is_nothing_to_do() {
let mut task = MockTask::new();
task.enter(|| {
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Pending);
let mut ns = NodeStream::<_, Vec<u8>>::new(muxer);
assert_matches!(ns.poll(), Ok(Async::NotReady));
});
}
#[test]
fn poll_keeps_outbound_substreams_when_the_outgoing_connection_is_not_ready() {
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Pending);
muxer.set_outbound_connection_state(DummyConnectionState::Pending);
let mut ns = NodeStream::<_, Vec<u8>>::new(muxer);
ns.open_substream(vec![1]);
ns.poll().unwrap(); ns.poll().unwrap(); assert!(format!("{:?}", ns).contains("outbound_substreams: 1"));
}
#[test]
fn poll_returns_incoming_substream() {
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Opened);
let mut ns = NodeStream::<_, Vec<u8>>::new(muxer);
assert_matches!(ns.poll(), Ok(Async::Ready(node_event)) => {
assert_matches!(node_event, NodeEvent::InboundSubstream{ substream: _ });
});
}
}