use crate::{PeerId, muxing::StreamMuxer};
use crate::nodes::node::{NodeEvent, NodeStream, Substream, Close};
use futures::prelude::*;
use std::{error, fmt, io};
mod tests;
pub trait NodeHandler {
type InEvent;
type OutEvent;
type Error;
type Substream;
type OutboundOpenInfo;
fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>);
fn inject_event(&mut self, event: Self::InEvent);
fn poll(&mut self) -> Poll<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
}
pub trait IntoNodeHandler<TConnInfo = PeerId> {
type Handler: NodeHandler;
fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler;
}
impl<T, TConnInfo> IntoNodeHandler<TConnInfo> for T
where
T: NodeHandler
{
type Handler = Self;
fn into_handler(self, _: &TConnInfo) -> Self {
self
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerEndpoint<TOutboundOpenInfo> {
Dialer(TOutboundOpenInfo),
Listener,
}
impl<TOutboundOpenInfo> NodeHandlerEndpoint<TOutboundOpenInfo> {
pub fn is_dialer(&self) -> bool {
match self {
NodeHandlerEndpoint::Dialer(_) => true,
NodeHandlerEndpoint::Listener => false,
}
}
pub fn is_listener(&self) -> bool {
match self {
NodeHandlerEndpoint::Dialer(_) => false,
NodeHandlerEndpoint::Listener => true,
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
OutboundSubstreamRequest(TOutboundOpenInfo),
Custom(TCustom),
}
impl<TOutboundOpenInfo, TCustom> NodeHandlerEvent<TOutboundOpenInfo, TCustom> {
pub fn map_outbound_open_info<F, I>(self, map: F) -> NodeHandlerEvent<I, TCustom>
where F: FnOnce(TOutboundOpenInfo) -> I
{
match self {
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(map(val))
},
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val),
}
}
pub fn map_custom<F, I>(self, map: F) -> NodeHandlerEvent<TOutboundOpenInfo, I>
where F: FnOnce(TCustom) -> I
{
match self {
NodeHandlerEvent::OutboundSubstreamRequest(val) => {
NodeHandlerEvent::OutboundSubstreamRequest(val)
},
NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)),
}
}
}
pub struct HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
{
node: NodeStream<TMuxer, THandler::OutboundOpenInfo>,
handler: THandler,
}
impl<TMuxer, THandler> fmt::Debug for HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HandledNode")
.field("node", &self.node)
.field("handler", &self.handler)
.finish()
}
}
impl<TMuxer, THandler> HandledNode<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: NodeHandler<Substream = Substream<TMuxer>>,
{
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
HandledNode {
node: NodeStream::new(muxer),
handler,
}
}
pub fn handler(&self) -> &THandler {
&self.handler
}
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}
pub fn is_remote_acknowledged(&self) -> bool {
self.node.is_remote_acknowledged()
}
pub fn close(self) -> Close<TMuxer> {
self.node.close().0
}
pub fn poll(&mut self) -> Poll<THandler::OutEvent, HandledNodeError<THandler::Error>> {
loop {
let mut node_not_ready = false;
match self.node.poll().map_err(HandledNodeError::Node)? {
Async::NotReady => node_not_ready = true,
Async::Ready(NodeEvent::InboundSubstream { substream }) => {
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener)
}
Async::Ready(NodeEvent::OutboundSubstream { user_data, substream }) => {
let endpoint = NodeHandlerEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
}
match self.handler.poll().map_err(HandledNodeError::Handler)? {
Async::NotReady => {
if node_not_ready {
break
}
}
Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(user_data)) => {
self.node.open_substream(user_data);
}
Async::Ready(NodeHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(event));
}
}
}
Ok(Async::NotReady)
}
}
#[derive(Debug)]
pub enum HandledNodeError<THandlerErr> {
Node(io::Error),
Handler(THandlerErr),
}
impl<THandlerErr> fmt::Display for HandledNodeError<THandlerErr>
where
THandlerErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
HandledNodeError::Node(err) => write!(f, "{}", err),
HandledNodeError::Handler(err) => write!(f, "{}", err),
}
}
}
impl<THandlerErr> error::Error for HandledNodeError<THandlerErr>
where
THandlerErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
HandledNodeError::Node(err) => Some(err),
HandledNodeError::Handler(err) => Some(err),
}
}
}