use crate::{
muxing::StreamMuxer,
nodes::{
handled_node::{HandledNode, IntoNodeHandler, NodeHandler},
node::{Close, Substream}
}
};
use futures::{prelude::*, channel::mpsc, stream};
use smallvec::SmallVec;
use std::{pin::Pin, task::Context, task::Poll};
use super::{TaskId, Error};
#[derive(Debug)]
pub enum ToTaskMessage<T> {
HandlerEvent(T),
TakeOver(mpsc::Sender<ToTaskMessage<T>>)
}
#[derive(Debug)]
pub enum FromTaskMessage<T, H, E, HE, C> {
NodeReached(C),
TaskClosed(Error<E, HE>, Option<H>),
NodeEvent(T)
}
pub struct Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
id: TaskId,
sender: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
receiver: stream::Fuse<mpsc::Receiver<ToTaskMessage<I>>>,
state: State<F, M, H, I, O, E, C>,
taken_over: SmallVec<[mpsc::Sender<ToTaskMessage<I>>; 1]>
}
impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
pub fn new (
i: TaskId,
s: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
r: mpsc::Receiver<ToTaskMessage<I>>,
f: F,
h: H
) -> Self {
Task {
id: i,
sender: s,
receiver: r.fuse(),
state: State::Future { future: Box::pin(f), handler: h, events_buffer: Vec::new() },
taken_over: SmallVec::new()
}
}
pub fn node (
i: TaskId,
s: mpsc::Sender<(FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>, TaskId)>,
r: mpsc::Receiver<ToTaskMessage<I>>,
n: HandledNode<M, H::Handler>
) -> Self {
Task {
id: i,
sender: s,
receiver: r.fuse(),
state: State::Node(n),
taken_over: SmallVec::new()
}
}
}
enum State<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
Future {
future: Pin<Box<F>>,
handler: H,
events_buffer: Vec<I>
},
SendEvent {
node: Option<HandledNode<M, H::Handler>>,
event: FromTaskMessage<O, H, E, <H::Handler as NodeHandler>::Error, C>
},
Node(HandledNode<M, H::Handler>),
Closing(Close<M>),
Undefined
}
impl<F, M, H, I, O, E, C> Unpin for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>>
{
}
impl<F, M, H, I, O, E, C> Future for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
F: Future<Output = Result<(C, M), E>>,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
let this = &mut *self;
'poll: loop {
match std::mem::replace(&mut this.state, State::Undefined) {
State::Future { mut future, handler, mut events_buffer } => {
loop {
match Stream::poll_next(Pin::new(&mut this.receiver), cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(ToTaskMessage::HandlerEvent(event))) =>
events_buffer.push(event),
Poll::Ready(Some(ToTaskMessage::TakeOver(take_over))) =>
this.taken_over.push(take_over),
}
}
match Future::poll(Pin::new(&mut future), cx) {
Poll::Ready(Ok((conn_info, muxer))) => {
let mut node = HandledNode::new(muxer, handler.into_handler(&conn_info));
for event in events_buffer {
node.inject_event(event)
}
this.state = State::SendEvent {
node: Some(node),
event: FromTaskMessage::NodeReached(conn_info)
}
}
Poll::Pending => {
this.state = State::Future { future, handler, events_buffer };
return Poll::Pending
}
Poll::Ready(Err(e)) => {
let event = FromTaskMessage::TaskClosed(Error::Reach(e), Some(handler));
this.state = State::SendEvent { node: None, event }
}
}
}
State::Node(mut node) => {
loop {
match Stream::poll_next(Pin::new(&mut this.receiver), cx) {
Poll::Pending => break,
Poll::Ready(Some(ToTaskMessage::HandlerEvent(event))) =>
node.inject_event(event),
Poll::Ready(Some(ToTaskMessage::TakeOver(take_over))) =>
this.taken_over.push(take_over),
Poll::Ready(None) => {
this.state = State::Closing(node.close());
continue 'poll
}
}
}
loop {
if !this.taken_over.is_empty() && node.is_remote_acknowledged() {
this.taken_over.clear()
}
match HandledNode::poll(Pin::new(&mut node), cx) {
Poll::Pending => {
this.state = State::Node(node);
return Poll::Pending
}
Poll::Ready(Ok(event)) => {
this.state = State::SendEvent {
node: Some(node),
event: FromTaskMessage::NodeEvent(event)
};
continue 'poll
}
Poll::Ready(Err(err)) => {
let event = FromTaskMessage::TaskClosed(Error::Node(err), None);
this.state = State::SendEvent { node: None, event };
continue 'poll
}
}
}
}
State::SendEvent { mut node, event } => {
loop {
match Stream::poll_next(Pin::new(&mut this.receiver), cx) {
Poll::Pending => break,
Poll::Ready(Some(ToTaskMessage::HandlerEvent(event))) =>
if let Some(ref mut n) = node {
n.inject_event(event)
}
Poll::Ready(Some(ToTaskMessage::TakeOver(take_over))) =>
this.taken_over.push(take_over),
Poll::Ready(None) =>
if let Some(n) = node {
this.state = State::Closing(n.close());
continue 'poll
} else {
return Poll::Ready(()) }
}
}
let close =
if let FromTaskMessage::TaskClosed(..) = event {
true
} else {
false
};
match this.sender.poll_ready(cx) {
Poll::Pending => {
self.state = State::SendEvent { node, event };
return Poll::Pending
}
Poll::Ready(Ok(())) => {
let _ = this.sender.start_send((event, this.id));
if let Some(n) = node {
if close {
this.state = State::Closing(n.close())
} else {
this.state = State::Node(n)
}
} else {
assert!(close);
return Poll::Ready(())
}
},
Poll::Ready(Err(_)) => {
if let Some(n) = node {
this.state = State::Closing(n.close());
continue 'poll
}
return Poll::Ready(())
}
}
}
State::Closing(mut closing) =>
match Future::poll(Pin::new(&mut closing), cx) {
Poll::Ready(_) =>
return Poll::Ready(()), Poll::Pending => {
this.state = State::Closing(closing);
return Poll::Pending
}
}
State::Undefined => panic!("`Task::poll()` called after completion.")
}
}
}
}