use crate::{
muxing::StreamMuxer,
nodes::{
handled_node::{HandledNode, IntoNodeHandler, NodeHandler},
node::{Close, Substream}
}
};
use futures::{prelude::*, stream, sync::mpsc};
use smallvec::SmallVec;
use super::{TaskId, Error};
#[derive(Debug)]
pub enum ToTaskMessage<T> {
HandlerEvent(T),
TakeOver(mpsc::Sender<ToTaskMessage<T>>)
}
#[derive(Debug)]
pub enum FromTaskMessage<T, H, E, HE, C> {
NodeReached(C),
TaskClosed(Error<E, HE>, Option<H>),
NodeEvent(T)
}
pub struct Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
id: TaskId,
sender: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
receiver: stream::Fuse<mpsc::Receiver<ToTaskMessage<I>>>,
state: State<F, M, H, I, O, E, C>,
taken_over: SmallVec<[mpsc::Sender<ToTaskMessage<I>>; 1]>
}
impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
pub fn new (
i: TaskId,
s: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
r: mpsc::Receiver<ToTaskMessage<I>>,
f: F,
h: H
) -> Self {
Task {
id: i,
sender: s,
receiver: r.fuse(),
state: State::Future { future: f, handler: h, events_buffer: Vec::new() },
taken_over: SmallVec::new()
}
}
pub fn node (
i: TaskId,
s: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
r: mpsc::Receiver<ToTaskMessage<I>>,
n: HandledNode<M, H::Handler>
) -> Self {
Task {
id: i,
sender: s,
receiver: r.fuse(),
state: State::Node(n),
taken_over: SmallVec::new()
}
}
}
enum State<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
Future {
future: F,
handler: H,
events_buffer: Vec<I>
},
SendEvent {
node: Option<HandledNode<M, H::Handler>>,
event: FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>
},
PollComplete(Option<HandledNode<M, H::Handler>>, bool),
Node(HandledNode<M, H::Handler>),
Closing(Close<M>),
Undefined
}
impl<F, M, H, I, O, E, C> Future for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
F: Future<Item = (C, M), Error = E>,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
'poll: loop {
match std::mem::replace(&mut self.state, State::Undefined) {
State::Future { mut future, handler, mut events_buffer } => {
loop {
match self.receiver.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) =>
events_buffer.push(event),
Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) =>
self.taken_over.push(take_over),
Err(()) => unreachable!("An `mpsc::Receiver` does not error.")
}
}
match future.poll() {
Ok(Async::Ready((conn_info, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info));
for event in events_buffer {
node.inject_event(event)
}
self.state = State::SendEvent {
node: Some(node),
event: FromTaskMessage::NodeReached(conn_info)
}
}
Ok(Async::NotReady) => {
self.state = State::Future { future, handler, events_buffer };
return Ok(Async::NotReady)
}
Err(e) => {
let event = FromTaskMessage::TaskClosed(Error::Reach(e), Some(handler));
self.state = State::SendEvent { node: None, event }
}
}
}
State::Node(mut node) => {
loop {
match self.receiver.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) =>
node.inject_event(event),
Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) =>
self.taken_over.push(take_over),
Ok(Async::Ready(None)) => {
self.state = State::Closing(node.close());
continue 'poll
}
Err(()) => unreachable!("An `mpsc::Receiver` does not error.")
}
}
loop {
if !self.taken_over.is_empty() && node.is_remote_acknowledged() {
self.taken_over.clear()
}
match node.poll() {
Ok(Async::NotReady) => {
self.state = State::Node(node);
return Ok(Async::NotReady)
}
Ok(Async::Ready(event)) => {
self.state = State::SendEvent {
node: Some(node),
event: FromTaskMessage::NodeEvent(event)
};
continue 'poll
}
Err(err) => {
let event = FromTaskMessage::TaskClosed(Error::Node(err), None);
self.state = State::SendEvent { node: None, event };
continue 'poll
}
}
}
}
State::SendEvent { mut node, event } => {
loop {
match self.receiver.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) =>
if let Some(ref mut n) = node {
n.inject_event(event)
}
Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) =>
self.taken_over.push(take_over),
Ok(Async::Ready(None)) =>
if let Some(n) = node {
self.state = State::Closing(n.close());
continue 'poll
} else {
return Ok(Async::Ready(())) }
Err(()) => unreachable!("An `mpsc::Receiver` does not error.")
}
}
let close =
if let FromTaskMessage::TaskClosed(..) = event {
true
} else {
false
};
match self.sender.start_send((event, self.id)) {
Ok(AsyncSink::NotReady((event, _))) => {
self.state = State::SendEvent { node, event };
return Ok(Async::NotReady)
}
Ok(AsyncSink::Ready) => self.state = State::PollComplete(node, close),
Err(_) => {
if let Some(n) = node {
self.state = State::Closing(n.close());
continue 'poll
}
return Ok(Async::Ready(()))
}
}
}
State::PollComplete(mut node, close) => {
loop {
match self.receiver.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(ToTaskMessage::HandlerEvent(event)))) =>
if let Some(ref mut n) = node {
n.inject_event(event)
}
Ok(Async::Ready(Some(ToTaskMessage::TakeOver(take_over)))) =>
self.taken_over.push(take_over),
Ok(Async::Ready(None)) =>
if let Some(n) = node {
self.state = State::Closing(n.close());
continue 'poll
} else {
return Ok(Async::Ready(())) }
Err(()) => unreachable!("An `mpsc::Receiver` does not error.")
}
}
match self.sender.poll_complete() {
Ok(Async::NotReady) => {
self.state = State::PollComplete(node, close);
return Ok(Async::NotReady)
}
Ok(Async::Ready(())) =>
if let Some(n) = node {
if close {
self.state = State::Closing(n.close())
} else {
self.state = State::Node(n)
}
} else {
assert!(close);
return Ok(Async::Ready(()))
}
Err(_) => {
if let Some(n) = node {
self.state = State::Closing(n.close());
continue 'poll
}
return Ok(Async::Ready(()))
}
}
}
State::Closing(mut closing) =>
match closing.poll() {
Ok(Async::Ready(())) | Err(_) =>
return Ok(Async::Ready(())), Ok(Async::NotReady) => {
self.state = State::Closing(closing);
return Ok(Async::NotReady)
}
}
State::Undefined => panic!("`Task::poll()` called after completion.")
}
}
}
}