use crate::{
Executor, PeerId,
muxing::StreamMuxer,
nodes::{
handled_node::{HandledNode, IntoNodeHandler, NodeHandler},
node::Substream
}
};
use fnv::FnvHashMap;
use futures::{prelude::*, channel::mpsc, stream::FuturesUnordered};
use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt, pin::Pin, task::Context, task::Poll};
use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error};
pub struct Manager<I, O, H, E, HE, T, C = PeerId> {
tasks: FnvHashMap<TaskId, TaskInfo<I, T>>,
next_task_id: TaskId,
executor: Option<Box<dyn Executor + Send>>,
local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
events_tx: mpsc::Sender<(FromTaskMessage<O, H, E, HE, C>, TaskId)>,
events_rx: mpsc::Receiver<(FromTaskMessage<O, H, E, HE, C>, TaskId)>
}
impl<I, O, H, E, HE, T, C> fmt::Debug for Manager<I, O, H, E, HE, T, C>
where
T: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_map()
.entries(self.tasks.iter().map(|(id, task)| (id, &task.user_data)))
.finish()
}
}
struct TaskInfo<I, T> {
sender: mpsc::Sender<ToTaskMessage<I>>,
user_data: T,
}
#[derive(Debug)]
pub enum Event<'a, I, O, H, E, HE, T, C = PeerId> {
TaskClosed {
task: ClosedTask<I, T>,
result: Error<E, HE>,
handler: Option<H>
},
NodeReached {
task: TaskEntry<'a, I, T>,
conn_info: C
},
NodeEvent {
task: TaskEntry<'a, I, T>,
event: O
}
}
impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
pub fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
tasks: FnvHashMap::default(),
next_task_id: TaskId(0),
executor,
local_spawns: FuturesUnordered::new(),
events_tx: tx,
events_rx: rx
}
}
pub fn add_reach_attempt<F, M>(&mut self, future: F, user_data: T, handler: H) -> TaskId
where
F: Future<Output = Result<(C, M), E>> + Send + 'static,
H: IntoNodeHandler<C> + Send + 'static,
H::Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O, Error = HE> + Send + 'static,
E: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
<H::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
C: Send + 'static
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::channel(4);
self.tasks.insert(task_id, TaskInfo { sender: tx, user_data });
let task = Box::pin(Task::new(task_id, self.events_tx.clone(), rx, future, handler));
if let Some(executor) = &self.executor {
executor.exec(task as Pin<Box<_>>)
} else {
self.local_spawns.push(task);
}
task_id
}
pub fn add_connection<M, Handler>(&mut self, user_data: T, muxer: M, handler: Handler) -> TaskId
where
H: IntoNodeHandler<C, Handler = Handler> + Send + 'static,
Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O, Error = HE> + Send + 'static,
E: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
<H::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
C: Send + 'static
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::channel(4);
self.tasks.insert(task_id, TaskInfo { sender: tx, user_data });
let task: Task<Pin<Box<futures::future::Pending<_>>>, _, _, _, _, _, _> =
Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler));
if let Some(executor) = &self.executor {
executor.exec(Box::pin(task))
} else {
self.local_spawns.push(Box::pin(task));
}
task_id
}
#[must_use]
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where
I: Clone
{
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) {
return Poll::Pending;
}
}
for task in self.tasks.values_mut() {
let msg = ToTaskMessage::HandlerEvent(event.clone());
match task.sender.start_send(msg) {
Ok(()) => {},
Err(ref err) if err.is_full() =>
panic!("poll_ready returned Poll::Ready just above; qed"),
Err(_) => {},
}
}
Poll::Ready(())
}
pub fn task(&mut self, id: TaskId) -> Option<TaskEntry<'_, I, T>> {
match self.tasks.entry(id) {
Entry::Occupied(inner) => Some(TaskEntry { inner }),
Entry::Vacant(_) => None,
}
}
pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
self.tasks.keys().cloned()
}
pub fn poll(&mut self, cx: &mut Context) -> Poll<Event<I, O, H, E, HE, T, C>> {
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
let (message, task_id) = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
Poll::Ready(Some((message, task_id))) => {
if self.tasks.contains_key(&task_id) {
break (message, task_id)
}
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => unreachable!("sender and receiver have same lifetime"),
}
};
Poll::Ready(match message {
FromTaskMessage::NodeEvent(event) =>
Event::NodeEvent {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => TaskEntry { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
event
},
FromTaskMessage::NodeReached(conn_info) =>
Event::NodeReached {
task: match self.tasks.entry(task_id) {
Entry::Occupied(inner) => TaskEntry { inner },
Entry::Vacant(_) => panic!("poll_inner only returns valid TaskIds; QED")
},
conn_info
},
FromTaskMessage::TaskClosed(result, handler) => {
let entry = self.tasks.remove(&task_id)
.expect("poll_inner only returns valid TaskIds; QED");
Event::TaskClosed {
task: ClosedTask::new(task_id, entry.sender, entry.user_data),
result,
handler
}
}
})
}
}
pub struct TaskEntry<'a, E, T> {
inner: OccupiedEntry<'a, TaskId, TaskInfo<E, T>>
}
impl<'a, E, T> TaskEntry<'a, E, T> {
pub fn start_send_event(&mut self, event: E) {
let msg = ToTaskMessage::HandlerEvent(event);
self.start_send_event_msg(msg);
}
pub fn poll_ready_event(&mut self, cx: &mut Context) -> Poll<()> {
self.poll_ready_event_msg(cx)
}
pub fn user_data(&self) -> &T {
&self.inner.get().user_data
}
pub fn user_data_mut(&mut self) -> &mut T {
&mut self.inner.get_mut().user_data
}
pub fn id(&self) -> TaskId {
*self.inner.key()
}
pub fn close(self) -> ClosedTask<E, T> {
let id = *self.inner.key();
let task = self.inner.remove();
ClosedTask::new(id, task.sender, task.user_data)
}
pub fn start_take_over(&mut self, t: ClosedTask<E, T>) {
self.start_send_event_msg(ToTaskMessage::TakeOver(t.sender));
}
pub fn poll_ready_take_over(&mut self, cx: &mut Context) -> Poll<()> {
self.poll_ready_event_msg(cx)
}
fn start_send_event_msg(&mut self, msg: ToTaskMessage<E>) {
match self.inner.get_mut().sender.start_send(msg) {
Ok(()) => {},
Err(ref err) if err.is_full() => {}, Err(_) => {},
}
}
fn poll_ready_event_msg(&mut self, cx: &mut Context) -> Poll<()> {
let task = self.inner.get_mut();
task.sender.poll_ready(cx).map(|_| ())
}
}
impl<E, T: fmt::Debug> fmt::Debug for TaskEntry<'_, E, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("TaskEntry")
.field(&self.id())
.field(self.user_data())
.finish()
}
}
pub struct ClosedTask<E, T> {
id: TaskId,
sender: mpsc::Sender<ToTaskMessage<E>>,
user_data: T
}
impl<E, T> ClosedTask<E, T> {
fn new(id: TaskId, sender: mpsc::Sender<ToTaskMessage<E>>, user_data: T) -> Self {
Self { id, sender, user_data }
}
pub fn id(&self) -> TaskId {
self.id
}
pub fn user_data(&self) -> &T {
&self.user_data
}
pub fn user_data_mut(&mut self) -> &mut T {
&mut self.user_data
}
pub fn into_user_data(self) -> T {
self.user_data
}
}
impl<E, T: fmt::Debug> fmt::Debug for ClosedTask<E, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("ClosedTask")
.field(&self.id)
.field(&self.user_data)
.finish()
}
}