use crate::{
PeerId,
muxing::StreamMuxer,
nodes::{
handled_node::{HandledNode, HandledNodeError, NodeHandler},
node::{Close, Substream}
}
};
use fnv::FnvHashMap;
use futures::{prelude::*, stream, sync::mpsc};
use smallvec::SmallVec;
use std::{
collections::hash_map::{Entry, OccupiedEntry},
error,
fmt,
mem
};
use tokio_executor::Executor;
mod tests;
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
tasks: FnvHashMap<TaskId, (mpsc::UnboundedSender<ExtToInMessage<TInEvent>>, TUserData)>,
next_task_id: TaskId,
to_spawn: SmallVec<[Box<dyn Future<Item = (), Error = ()> + Send>; 8]>,
local_spawns: Vec<Box<dyn Future<Item = (), Error = ()> + Send>>,
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)>,
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo> fmt::Debug for
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
where
TUserData: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_map()
.entries(self.tasks.iter().map(|(id, (_, ud))| (id, ud)))
.finish()
}
}
#[derive(Debug)]
pub enum TaskClosedEvent<TReachErr, THandlerErr> {
Reach(TReachErr),
Node(HandledNodeError<THandlerErr>),
}
impl<TReachErr, THandlerErr> fmt::Display for TaskClosedEvent<TReachErr, THandlerErr>
where
TReachErr: fmt::Display,
THandlerErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TaskClosedEvent::Reach(err) => write!(f, "{}", err),
TaskClosedEvent::Node(err) => write!(f, "{}", err),
}
}
}
impl<TReachErr, THandlerErr> error::Error for TaskClosedEvent<TReachErr, THandlerErr>
where
TReachErr: error::Error + 'static,
THandlerErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TaskClosedEvent::Reach(err) => Some(err),
TaskClosedEvent::Node(err) => Some(err),
}
}
}
pub trait IntoNodeHandler<TConnInfo = PeerId> {
type Handler: NodeHandler;
fn into_handler(self, remote_conn_info: &TConnInfo) -> Self::Handler;
}
impl<T, TConnInfo> IntoNodeHandler<TConnInfo> for T
where T: NodeHandler
{
type Handler = Self;
#[inline]
fn into_handler(self, _: &TConnInfo) -> Self {
self
}
}
#[derive(Debug)]
pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo = PeerId> {
TaskClosed {
task: ClosedTask<TInEvent, TUserData>,
result: TaskClosedEvent<TReachErr, THandlerErr>,
handler: Option<TIntoHandler>,
},
NodeReached {
task: Task<'a, TInEvent, TUserData>,
conn_info: TConnInfo,
},
NodeEvent {
task: Task<'a, TInEvent, TUserData>,
event: TOutEvent,
},
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>
{
#[inline]
pub fn new() -> Self {
let (events_tx, events_rx) = mpsc::unbounded();
HandledNodesTasks {
tasks: Default::default(),
next_task_id: TaskId(0),
to_spawn: SmallVec::new(),
local_spawns: Vec::new(),
events_tx,
events_rx,
}
}
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, user_data: TUserData, handler: TIntoHandler) -> TaskId
where
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
TIntoHandler: IntoNodeHandler<TConnInfo> + Send + 'static,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
TReachErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
<TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::unbounded();
self.tasks.insert(task_id, (tx, user_data));
let task = Box::new(NodeTask {
taken_over: SmallVec::new(),
inner: NodeTaskInner::Future {
future,
handler,
events_buffer: Vec::new(),
},
events_tx: self.events_tx.clone(),
in_events_rx: rx.fuse(),
id: task_id,
});
self.to_spawn.push(task);
task_id
}
pub fn broadcast_event(&mut self, event: &TInEvent)
where TInEvent: Clone,
{
for (sender, _) in self.tasks.values() {
let _ = sender.unbounded_send(ExtToInMessage::HandlerEvent(event.clone()));
}
}
#[inline]
pub fn task(&mut self, id: TaskId) -> Option<Task<'_, TInEvent, TUserData>> {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(Task { inner }),
Entry::Vacant(_) => None,
}
}
#[inline]
pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
self.tasks.keys().cloned()
}
pub fn poll(&mut self) -> Async<HandledNodesEvent<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TUserData, TConnInfo>> {
let (message, task_id) = match self.poll_inner() {
Async::Ready(r) => r,
Async::NotReady => return Async::NotReady,
};
Async::Ready(match message {
InToExtMessage::NodeEvent(event) => {
HandledNodesEvent::NodeEvent {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => Task { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
event
}
},
InToExtMessage::NodeReached(conn_info) => {
HandledNodesEvent::NodeReached {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => Task { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
conn_info
}
},
InToExtMessage::TaskClosed(result, handler) => {
let (channel, user_data) = self.tasks.remove(&task_id)
.expect("poll_inner only returns valid TaskIds; QED");
HandledNodesEvent::TaskClosed {
task: ClosedTask {
id: task_id,
channel,
user_data,
},
result,
handler,
}
},
})
}
fn poll_inner(&mut self) -> Async<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo>, TaskId)> {
for to_spawn in self.to_spawn.drain() {
let mut executor = tokio_executor::DefaultExecutor::current();
if executor.status().is_ok() {
executor.spawn(to_spawn).expect("failed to create a node task");
} else {
self.local_spawns.push(to_spawn);
}
}
for n in (0..self.local_spawns.len()).rev() {
let mut task = self.local_spawns.swap_remove(n);
match task.poll() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => self.local_spawns.push(task),
Err(_err) => ()
}
}
loop {
match self.events_rx.poll() {
Ok(Async::Ready(Some((message, task_id)))) => {
if self.tasks.contains_key(&task_id) {
break Async::Ready((message, task_id));
}
}
Ok(Async::NotReady) => {
break Async::NotReady;
}
Ok(Async::Ready(None)) => {
unreachable!("The sender is in self as well, therefore the receiver never \
closes.")
},
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
}
}
pub struct Task<'a, TInEvent, TUserData> {
inner: OccupiedEntry<'a, TaskId, (mpsc::UnboundedSender<ExtToInMessage<TInEvent>>, TUserData)>,
}
impl<'a, TInEvent, TUserData> Task<'a, TInEvent, TUserData> {
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::HandlerEvent(event));
}
pub fn user_data(&self) -> &TUserData {
&self.inner.get().1
}
pub fn user_data_mut(&mut self) -> &mut TUserData {
&mut self.inner.get_mut().1
}
#[inline]
pub fn id(&self) -> TaskId {
*self.inner.key()
}
pub fn close(self) -> ClosedTask<TInEvent, TUserData> {
let id = *self.inner.key();
let (channel, user_data) = self.inner.remove();
ClosedTask { id, channel, user_data }
}
pub fn take_over(&mut self, other: ClosedTask<TInEvent, TUserData>) -> TUserData {
let _ = self.inner.get_mut().0.unbounded_send(ExtToInMessage::TakeOver(other.channel));
other.user_data
}
}
impl<'a, TInEvent, TUserData> fmt::Debug for Task<'a, TInEvent, TUserData>
where
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_tuple("Task")
.field(&self.id())
.field(self.user_data())
.finish()
}
}
pub struct ClosedTask<TInEvent, TUserData> {
id: TaskId,
channel: mpsc::UnboundedSender<ExtToInMessage<TInEvent>>,
user_data: TUserData,
}
impl<TInEvent, TUserData> ClosedTask<TInEvent, TUserData> {
#[inline]
pub fn id(&self) -> TaskId {
self.id
}
pub fn user_data(&self) -> &TUserData {
&self.user_data
}
pub fn user_data_mut(&mut self) -> &mut TUserData {
&mut self.user_data
}
pub fn into_user_data(self) -> TUserData {
self.user_data
}
}
impl<TInEvent, TUserData> fmt::Debug for ClosedTask<TInEvent, TUserData>
where
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_tuple("ClosedTask")
.field(&self.id)
.field(&self.user_data)
.finish()
}
}
#[derive(Debug)]
enum ExtToInMessage<TInEvent> {
HandlerEvent(TInEvent),
TakeOver(mpsc::UnboundedSender<ExtToInMessage<TInEvent>>),
}
#[derive(Debug)]
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TConnInfo> {
NodeReached(TConnInfo),
TaskClosed(TaskClosedEvent<TReachErr, THandlerErr>, Option<TIntoHandler>),
NodeEvent(TOutEvent),
}
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TConnInfo>, TaskId)>,
in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<ExtToInMessage<TInEvent>>>,
inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>,
id: TaskId,
taken_over: SmallVec<[mpsc::UnboundedSender<ExtToInMessage<TInEvent>>; 1]>,
}
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TConnInfo>
where
TMuxer: StreamMuxer,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
Future {
future: TFut,
handler: TIntoHandler,
events_buffer: Vec<TInEvent>,
},
Node(HandledNode<TMuxer, TIntoHandler::Handler>),
Closing(Close<TMuxer>),
Poisoned,
}
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo> Future for
NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TConnInfo>
where
TMuxer: StreamMuxer,
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr>,
TIntoHandler: IntoNodeHandler<TConnInfo>,
TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
'outer_loop: loop {
match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) {
NodeTaskInner::Future { mut future, handler, mut events_buffer } => {
loop {
match self.in_events_rx.poll() {
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => {
events_buffer.push(event)
},
Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => {
self.taken_over.push(take_over);
},
Ok(Async::NotReady) => break,
Err(_) => unreachable!("An UnboundedReceiver never errors"),
}
}
match future.poll() {
Ok(Async::Ready((conn_info, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info));
let event = InToExtMessage::NodeReached(conn_info);
for event in events_buffer {
node.inject_event(event);
}
let _ = self.events_tx.unbounded_send((event, self.id));
self.inner = NodeTaskInner::Node(node);
}
Ok(Async::NotReady) => {
self.inner = NodeTaskInner::Future { future, handler, events_buffer };
return Ok(Async::NotReady);
},
Err(err) => {
let event = InToExtMessage::TaskClosed(TaskClosedEvent::Reach(err), Some(handler));
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(()));
}
}
},
NodeTaskInner::Node(mut node) => {
if !self.in_events_rx.is_done() {
loop {
match self.in_events_rx.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(ExtToInMessage::HandlerEvent(event)))) => {
node.inject_event(event)
},
Ok(Async::Ready(Some(ExtToInMessage::TakeOver(take_over)))) => {
self.taken_over.push(take_over);
},
Ok(Async::Ready(None)) => {
self.inner = NodeTaskInner::Closing(node.close());
continue 'outer_loop;
}
Err(()) => unreachable!("An unbounded receiver never errors"),
}
}
}
loop {
if !self.taken_over.is_empty() && node.is_remote_acknowledged() {
self.taken_over.clear();
}
match node.poll() {
Ok(Async::NotReady) => {
self.inner = NodeTaskInner::Node(node);
return Ok(Async::NotReady);
},
Ok(Async::Ready(event)) => {
let event = InToExtMessage::NodeEvent(event);
let _ = self.events_tx.unbounded_send((event, self.id));
}
Err(err) => {
let event = InToExtMessage::TaskClosed(TaskClosedEvent::Node(err), None);
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(()));
}
}
}
},
NodeTaskInner::Closing(mut closing) => {
match closing.poll() {
Ok(Async::Ready(())) | Err(_) => {
return Ok(Async::Ready(()));
},
Ok(Async::NotReady) => {
self.inner = NodeTaskInner::Closing(closing);
return Ok(Async::NotReady);
}
}
},
NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier")
}
}
}
}