use crate::muxing::StreamMuxer;
use crate::nodes::node::{NodeEvent, NodeStream, Substream};
use futures::{prelude::*, stream::Fuse};
use std::{error, fmt, io};
mod tests;
pub trait NodeHandler {
type InEvent;
type OutEvent;
type Error;
type Substream;
type OutboundOpenInfo;
fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);
fn inject_inbound_closed(&mut self);
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo);
fn inject_event(&mut self, event: Self::InEvent);
fn shutdown(&mut self);
fn poll(&mut self) -> Poll<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerEndpoint<TOutboundOpenInfo> {
Dialer(TOutboundOpenInfo),
Listener,
}
impl<TOutboundOpenInfo> NodeHandlerEndpoint<TOutboundOpenInfo> {
#[inline]
pub fn is_dialer(&self) -> bool {
match self {
NodeHandlerEndpoint::Dialer(_) => true,
NodeHandlerEndpoint::Listener => false,
}
}
#[inline]
pub fn is_listener(&self) -> bool {
match self {
NodeHandlerEndpoint::Dialer(_) => false,
NodeHandlerEndpoint::Listener => true,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
OutboundSubstreamRequest(TOutboundOpenInfo),
Shutdown,
Custom(TCustom),
}
impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
#[inline]
pub fn map_outbound_open_info<F, I>(self, map: F) -> NodeHandlerEvent<I, TCustom>
where F: FnOnce(TOutboundOpenInfo) -> I
{
match self {
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(map(val))
},
NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown,
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val),
}
}
#[inline]
pub fn map_custom<F, I>(self, map: F) -> NodeHandlerEvent<TOutboundOpenInfo, I>
where F: FnOnce(TCustom) -> I
{
match self {
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(val)
},
NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown,
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)),
}
}
}
pub struct HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
{
node: Fuse<NodeStream<TMuxer, THandler::OutboundOpenInfo>>,
handler: THandler,
handler_is_done: bool,
is_shutting_down: bool
}
impl<TMuxer, THandler> fmt::Debug for HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("HandledNode")
.field("node", &self.node)
.field("handler", &self.handler)
.field("handler_is_done", &self.handler_is_done)
.field("is_shutting_down", &self.is_shutting_down)
.finish()
}
}
impl<TMuxer, THandler> HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
{
#[inline]
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
HandledNode {
node: NodeStream::new(muxer).fuse(),
handler,
handler_is_done: false,
is_shutting_down: false
}
}
pub fn handler(&self) -> &THandler{
&self.handler
}
pub fn handler_mut(&mut self) -> &mut THandler{
&mut self.handler
}
#[inline]
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}
#[inline]
pub fn is_inbound_open(&self) -> bool {
self.node.get_ref().is_inbound_open()
}
#[inline]
pub fn is_outbound_open(&self) -> bool {
self.node.get_ref().is_outbound_open()
}
#[inline]
pub fn is_shutting_down(&self) -> bool {
self.is_shutting_down
}
pub fn shutdown(&mut self) {
self.node.get_mut().shutdown_all();
for user_data in self.node.get_mut().cancel_outgoing() {
self.handler.inject_outbound_closed(user_data);
}
self.handler.shutdown();
self.is_shutting_down = true;
}
}
impl<TMuxer, THandler> Stream for HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
{
type Item = THandler::OutEvent;
type Error = HandledNodeError<THandler::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.node.is_done() && self.handler_is_done {
return Ok(Async::Ready(None));
}
let mut node_not_ready = false;
match self.node.poll().map_err(HandledNodeError::Node)? {
Async::NotReady => node_not_ready = true,
Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => {
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener)
}
Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })) => {
let endpoint = NodeHandlerEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Async::Ready(None) => {
if !self.is_shutting_down {
self.is_shutting_down = true;
self.handler.shutdown()
}
}
Async::Ready(Some(NodeEvent::OutboundClosed { user_data })) => {
self.handler.inject_outbound_closed(user_data)
}
Async::Ready(Some(NodeEvent::InboundClosed)) => {
self.handler.inject_inbound_closed()
}
}
match if self.handler_is_done { Async::Ready(NodeHandlerEvent::Shutdown) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } {
Async::NotReady => {
if node_not_ready {
break
}
}
Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(user_data)) => {
if self.node.get_ref().is_outbound_open() {
match self.node.get_mut().open_substream(user_data) {
Ok(()) => (),
Err(user_data) => {
self.handler.inject_outbound_closed(user_data)
},
}
} else {
self.handler.inject_outbound_closed(user_data);
}
}
Async::Ready(NodeHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(Some(event)));
}
Async::Ready(NodeHandlerEvent::Shutdown) => {
self.handler_is_done = true;
if !self.is_shutting_down {
self.is_shutting_down = true;
self.node.get_mut().cancel_outgoing();
self.node.get_mut().shutdown_all();
}
}
}
}
Ok(Async::NotReady)
}
}
#[derive(Debug)]
pub enum HandledNodeError<THandlerErr> {
Node(io::Error),
Handler(THandlerErr),
}
impl<THandlerErr> fmt::Display for HandledNodeError<THandlerErr>
where THandlerErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HandledNodeError::Node(err) => write!(f, "{}", err),
HandledNodeError::Handler(err) => write!(f, "{}", err),
}
}
}
impl<THandlerErr> error::Error for HandledNodeError<THandlerErr>
where THandlerErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
HandledNodeError::Node(err) => Some(err),
HandledNodeError::Handler(err) => Some(err),
}
}
}