#![cfg(test)]
use super::*;
use std::io;
use assert_matches::assert_matches;
use futures::future::{self, FutureResult};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::nodes::handled_node::NodeHandlerEvent;
use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode};
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tokio::runtime::Builder;
use tokio::runtime::current_thread::Runtime;
use void::Void;
use crate::PeerId;
type TestNodeTask = NodeTask<
FutureResult<(PeerId, DummyMuxer), io::Error>,
DummyMuxer,
Handler,
InEvent,
OutEvent,
io::Error,
>;
struct NodeTaskTestBuilder {
task_id: TaskId,
inner_node: Option<TestHandledNode>,
inner_fut: Option<FutureResult<(PeerId, DummyMuxer), io::Error>>,
}
impl NodeTaskTestBuilder {
fn new() -> Self {
NodeTaskTestBuilder {
task_id: TaskId(123),
inner_node: None,
inner_fut: {
let peer_id = PeerId::random();
Some(future::ok((peer_id, DummyMuxer::new())))
},
}
}
fn with_inner_fut(&mut self, fut: FutureResult<(PeerId, DummyMuxer), io::Error>) -> &mut Self{
self.inner_fut = Some(fut);
self
}
fn with_task_id(&mut self, id: usize) -> &mut Self {
self.task_id = TaskId(id);
self
}
fn node_task(&mut self) -> (
TestNodeTask,
UnboundedSender<InEvent>,
UnboundedReceiver<(InToExtMessage<OutEvent, Handler, io::Error, io::Error>, TaskId)>,
) {
let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage<OutEvent, Handler, _, _>, TaskId)>();
let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::<InEvent>();
let inner = if self.inner_node.is_some() {
NodeTaskInner::Node(self.inner_node.take().unwrap())
} else {
NodeTaskInner::Future {
future: self.inner_fut.take().unwrap(),
handler: Handler::default(),
events_buffer: Vec::new(),
}
};
let node_task = NodeTask {
inner: inner,
events_tx: events_from_node_task_tx.clone(), in_events_rx: events_to_node_task_rx.fuse(), id: self.task_id,
};
(node_task, events_to_node_task_tx, events_from_node_task_rx)
}
}
type TestHandledNodesTasks = HandledNodesTasks<InEvent, OutEvent, Handler, io::Error, io::Error>;
struct HandledNodeTaskTestBuilder {
muxer: DummyMuxer,
handler: Handler,
task_count: usize,
}
impl HandledNodeTaskTestBuilder {
fn new() -> Self {
HandledNodeTaskTestBuilder {
muxer: DummyMuxer::new(),
handler: Handler::default(),
task_count: 0,
}
}
fn with_tasks(&mut self, amt: usize) -> &mut Self {
self.task_count = amt;
self
}
fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_inbound_connection_state(state);
self
}
fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_outbound_connection_state(state);
self
}
fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
self.handler.state = Some(state);
self
}
fn with_handler_states(&mut self, states: Vec<HandlerState>) -> &mut Self {
self.handler.next_states = states;
self
}
fn handled_nodes_tasks(&mut self) -> (TestHandledNodesTasks, Vec<TaskId>) {
let mut handled_nodes = HandledNodesTasks::new();
let peer_id = PeerId::random();
let mut task_ids = Vec::new();
for _i in 0..self.task_count {
let fut = future::ok((peer_id.clone(), self.muxer.clone()));
task_ids.push(
handled_nodes.add_reach_attempt(fut, self.handler.clone())
);
}
(handled_nodes, task_ids)
}
}
#[test]
fn task_emits_event_when_things_happen_in_the_node() {
let (node_task, tx, mut rx) = NodeTaskTestBuilder::new()
.with_task_id(890)
.node_task();
tx.unbounded_send(InEvent::Custom("beef")).expect("send to NodeTask should work");
let mut rt = Runtime::new().unwrap();
rt.spawn(node_task);
let events = rt.block_on(rx.by_ref().take(2).collect()).expect("reading on rx should work");
assert_matches!(events[0], (InToExtMessage::NodeReached(_), TaskId(890)));
assert_matches!(events[1], (InToExtMessage::NodeEvent(ref outevent), TaskId(890)) => {
assert_matches!(outevent, OutEvent::Custom(beef) => {
assert_eq!(beef, &"beef");
})
});
}
#[test]
fn task_exits_when_node_errors() {
let mut rt = Runtime::new().unwrap();
let (node_task, _tx, rx) = NodeTaskTestBuilder::new()
.with_inner_fut(future::err(io::Error::new(io::ErrorKind::Other, "nah")))
.with_task_id(345)
.node_task();
rt.spawn(node_task);
let events = rt.block_on(rx.collect()).expect("rx failed");
assert!(events.len() == 1);
assert_matches!(events[0], (InToExtMessage::TaskClosed{..}, TaskId(345)));
}
#[test]
fn task_exits_when_node_is_done() {
let mut rt = Runtime::new().unwrap();
let fut = {
let peer_id = PeerId::random();
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
future::ok((peer_id, muxer))
};
let (node_task, tx, rx) = NodeTaskTestBuilder::new()
.with_inner_fut(fut)
.with_task_id(345)
.node_task();
let create_outbound_substream_event = InEvent::Substream(Some(135));
tx.unbounded_send(create_outbound_substream_event).expect("send msg works");
rt.spawn(node_task);
let events = rt.block_on(rx.collect()).expect("rx failed");
assert_eq!(events.len(), 2);
assert_matches!(events[0].0, InToExtMessage::NodeReached(PeerId{..}));
assert_matches!(events[1].0, InToExtMessage::TaskClosed(Ok(()), _));
}
#[test]
fn query_for_tasks() {
let (mut handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new()
.with_tasks(3)
.handled_nodes_tasks();
assert_eq!(task_ids.len(), 3);
assert_eq!(handled_nodes.task(TaskId(2)).unwrap().id(), task_ids[2]);
assert!(handled_nodes.task(TaskId(545534)).is_none());
}
#[test]
fn send_event_to_task() {
let (mut handled_nodes, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(1)
.handled_nodes_tasks();
let task_id = {
let mut task = handled_nodes.task(TaskId(0)).expect("can fetch a Task");
task.send_event(InEvent::Custom("banana"));
task.id()
};
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut events = rt.block_on(handled_nodes.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: event_task_id, event} => {
assert_eq!(event_task_id, task_id);
assert_matches!(event, OutEvent::Custom("banana"));
});
}
#[test]
fn iterate_over_all_tasks() {
let (handled_nodes, task_ids) = HandledNodeTaskTestBuilder::new()
.with_tasks(3)
.handled_nodes_tasks();
let mut tasks: Vec<TaskId> = handled_nodes.tasks().collect();
assert!(tasks.len() == 3);
tasks.sort_by_key(|t| t.0 );
assert_eq!(tasks, task_ids);
}
#[test]
fn add_reach_attempt_prepares_a_new_task() {
let mut handled_nodes = HandledNodesTasks::new();
assert_eq!(handled_nodes.tasks().count(), 0);
assert_eq!(handled_nodes.to_spawn.len(), 0);
handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() );
assert_eq!(handled_nodes.tasks().count(), 1);
assert_eq!(handled_nodes.to_spawn.len(), 1);
}
#[test]
fn running_handled_tasks_reaches_the_nodes() {
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(5)
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) .handled_nodes_tasks();
let mut rt = Runtime::new().unwrap();
let mut events: (Option<HandledNodesEvent<_, _, _, _>>, TestHandledNodesTasks);
for i in 0..5 {
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events, (Some(HandledNodesEvent::NodeReached{..}), ref hnt) => {
assert_matches!(hnt, HandledNodesTasks{..});
assert_eq!(hnt.tasks().count(), 5 - i);
});
handled_nodes_tasks = events.1;
events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events, (Some(HandledNodesEvent::TaskClosed{..}), _));
handled_nodes_tasks = events.1;
}
}
#[test]
fn events_in_tasks_are_emitted() {
let handler_states = vec![
HandlerState::Err,
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler2") ))),
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler") ))),
];
let (mut handled_nodes_tasks, _) = HandledNodeTaskTestBuilder::new()
.with_tasks(1)
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_handler_states(handler_states)
.handled_nodes_tasks();
let tx = {
let mut task0 = handled_nodes_tasks.task(TaskId(0)).unwrap();
let tx = task0.inner.get_mut();
tx.clone()
};
let mut rt = Builder::new().core_threads(1).build().unwrap();
let mut events = rt.block_on(handled_nodes_tasks.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeReached{..});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
assert_matches!(event, OutEvent::Custom("from handler"));
});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::NodeEvent{id: _, event} => {
assert_matches!(event, OutEvent::Custom("from handler2"));
});
tx.unbounded_send(InEvent::NextState).expect("send works");
events = rt.block_on(events.1.into_future()).unwrap();
assert_matches!(events.0.unwrap(), HandledNodesEvent::TaskClosed{id: _, result, handler: _} => {
assert_matches!(result, Err(_));
});
}