#![cfg(test)]
use super::*;
use assert_matches::assert_matches;
use tokio::runtime::current_thread;
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode};
use std::{io, marker::PhantomData};
struct TestBuilder {
muxer: DummyMuxer,
handler: Handler,
want_open_substream: bool,
substream_user_data: usize,
}
impl TestBuilder {
fn new() -> Self {
TestBuilder {
muxer: DummyMuxer::new(),
handler: Handler::default(),
want_open_substream: false,
substream_user_data: 0,
}
}
fn with_muxer_inbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_inbound_connection_state(state);
self
}
fn with_muxer_outbound_state(&mut self, state: DummyConnectionState) -> &mut Self {
self.muxer.set_outbound_connection_state(state);
self
}
fn with_handler_state(&mut self, state: HandlerState) -> &mut Self {
self.handler.state = Some(state);
self
}
fn with_open_substream(&mut self, user_data: usize) -> &mut Self {
self.want_open_substream = true;
self.substream_user_data = user_data;
self
}
fn handled_node(&mut self) -> TestHandledNode {
let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone());
if self.want_open_substream {
h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work");
}
h
}
}
fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_state: HandlerState) {
handled_node.handler.next_outbound_state = Some(next_state);
}
#[test]
fn proper_shutdown() {
struct ShutdownHandler<T> {
did_substream_attempt: bool,
inbound_closed: bool,
substream_attempt_cancelled: bool,
shutdown_called: bool,
marker: PhantomData<T>
}
impl<T> NodeHandler for ShutdownHandler<T> {
type InEvent = ();
type OutEvent = ();
type Substream = T;
type Error = io::Error;
type OutboundOpenInfo = ();
fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint<Self::OutboundOpenInfo>) { panic!() }
fn inject_inbound_closed(&mut self) {
assert!(!self.inbound_closed);
self.inbound_closed = true;
}
fn inject_outbound_closed(&mut self, _: ()) {
assert!(!self.substream_attempt_cancelled);
self.substream_attempt_cancelled = true;
}
fn inject_event(&mut self, _: Self::InEvent) { panic!() }
fn shutdown(&mut self) {
assert!(self.inbound_closed);
assert!(self.substream_attempt_cancelled);
self.shutdown_called = true;
}
fn poll(&mut self) -> Poll<NodeHandlerEvent<(), ()>, io::Error> {
if self.shutdown_called {
Ok(Async::Ready(NodeHandlerEvent::Shutdown))
} else if !self.did_substream_attempt {
self.did_substream_attempt = true;
Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(())))
} else {
Ok(Async::NotReady)
}
}
}
impl<T> Drop for ShutdownHandler<T> {
fn drop(&mut self) {
if self.did_substream_attempt {
assert!(self.shutdown_called);
}
}
}
let mut muxer = DummyMuxer::new();
muxer.set_inbound_connection_state(DummyConnectionState::Closed);
muxer.set_outbound_connection_state(DummyConnectionState::Closed);
let handled = HandledNode::new(muxer, ShutdownHandler {
did_substream_attempt: false,
inbound_closed: false,
substream_attempt_cancelled: false,
shutdown_called: false,
marker: PhantomData,
});
current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap();
}
#[test]
fn can_inject_event() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.handled_node();
let event = InEvent::Custom("banana");
handled.inject_event(event.clone());
assert_eq!(handled.handler().events, vec![event]);
}
#[test]
fn knows_if_inbound_is_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(None)) .handled_node();
handled.poll().expect("poll failed");
assert!(!handled.is_inbound_open())
}
#[test]
fn knows_if_outbound_is_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(None)) .with_open_substream(987) .handled_node();
handled.poll().expect("poll failed");
assert!(!handled.is_outbound_open());
}
#[test]
fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() {
let mut handled = TestBuilder::new()
.with_handler_state(HandlerState::Ready(None)) .handled_node();
assert!(!handled.is_shutting_down());
handled.poll().expect("poll should work");
handled.shutdown();
assert!(handled.is_shutting_down());
}
#[test]
fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_open_substream(123) .handled_node();
handled.poll().expect("poll should work");
assert!(handled.is_shutting_down());
}
#[test]
fn is_shutting_down_is_true_when_handler_is_gone() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Ready(None)) .handled_node();
handled.poll().expect("poll should work");
assert!(handled.is_shutting_down());
}
#[test]
fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Opened)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_open_substream(123)
.with_handler_state(HandlerState::Ready(None))
.handled_node();
handled.poll().expect("poll should work");
assert!(handled.is_shutting_down());
}
#[test]
fn poll_with_unready_node_stream_polls_handler() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Ready(None))
.handled_node();
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
}
#[test]
fn poll_with_unready_node_stream_and_handler_emits_custom_event() {
let expected_event = Some(NodeHandlerEvent::Custom(OutEvent::Custom("pineapple")));
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Ready(expected_event))
.handled_node();
assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => {
assert_matches!(event, OutEvent::Custom("pineapple"))
});
}
#[test]
fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() {
let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456));
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Ready(open_event))
.handled_node();
set_next_handler_outbound_state(
&mut handled,
HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))))
);
handled.poll().expect("poll works");
assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]);
}
#[test]
fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() {
let mut handled = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_open_substream(12)
.with_handler_state(HandlerState::NotReady)
.handled_node();
assert_matches!(handled.poll(), Ok(Async::Ready(None)));
assert_eq!(handled.handler().events, vec![
InEvent::InboundClosed, InEvent::OutboundClosed
]);
}
#[test]
fn poll_yields_inbound_closed_event() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) .handled_node();
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![InEvent::InboundClosed]);
}
#[test]
fn poll_yields_outbound_closed_event() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_open_substream(32)
.with_muxer_outbound_state(DummyConnectionState::Closed)
.with_handler_state(HandlerState::Err) .handled_node();
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]);
}
#[test]
fn poll_yields_outbound_substream() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Pending)
.with_muxer_outbound_state(DummyConnectionState::Opened)
.with_open_substream(1)
.with_handler_state(HandlerState::Err) .handled_node();
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![InEvent::Substream(Some(1))]);
}
#[test]
fn poll_yields_inbound_substream() {
let mut h = TestBuilder::new()
.with_muxer_inbound_state(DummyConnectionState::Opened)
.with_muxer_outbound_state(DummyConnectionState::Pending)
.with_handler_state(HandlerState::Err) .handled_node();
assert_eq!(h.handler().events, vec![]);
let _ = h.poll();
assert_eq!(h.handler().events, vec![InEvent::Substream(None)]);
}