use futures::prelude::*;
use libp2p_core::{identity, muxing::StreamMuxerBox, upgrade, Transport};
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
use libp2p_core::nodes::network::IncomingError;
use libp2p_swarm::{
NegotiatedSubstream,
ProtocolsHandler,
KeepAlive,
SubstreamProtocol,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
};
use std::{io, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;
#[derive(Default)]
struct TestHandler;
impl ProtocolsHandler for TestHandler {
type InEvent = (); type OutEvent = (); type Error = io::Error;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = upgrade::DeniedUpgrade;
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(upgrade::DeniedUpgrade)
}
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output
) { panic!() }
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) { panic!() }
fn inject_event(&mut self, _: Self::InEvent) {
panic!()
}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Error>) {
}
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes }
fn poll(&mut self, _: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
Poll::Pending
}
}
#[test]
fn raw_swarm_simultaneous_connect() {
for _ in 0 .. 10 {
let mut swarm1 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into_peer_id(), None)
};
let mut swarm2 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into_peer_id(), None)
};
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let swarm1_listen_addr = future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
}
})
.now_or_never()
.expect("listen address of swarm1");
let swarm2_listen_addr = future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
}
})
.now_or_never()
.expect("listen address of swarm2");
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum Step {
Start,
Dialing,
Connected,
Replaced,
Denied
}
loop {
let mut swarm1_step = Step::Start;
let mut swarm2_step = Step::Start;
let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let future = future::poll_fn(|cx| {
loop {
let mut swarm1_not_ready = false;
let mut swarm2_not_ready = false;
if swarm1_step == Step::Start {
if swarm1_dial_start.poll_unpin(cx).is_ready() {
let handler = TestHandler::default().into_node_handler_builder();
swarm1.peer(swarm2.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm2_listen_addr.clone(), handler);
swarm1_step = Step::Dialing;
} else {
swarm1_not_ready = true
}
}
if swarm2_step == Step::Start {
if swarm2_dial_start.poll_unpin(cx).is_ready() {
let handler = TestHandler::default().into_node_handler_builder();
swarm2.peer(swarm1.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm1_listen_addr.clone(), handler);
swarm2_step = Step::Dialing;
} else {
swarm2_not_ready = true
}
}
if rand::random::<f32>() < 0.1 {
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm1_step, Step::Connected);
swarm1_step = Step::Denied
}
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm2.local_peer_id());
if swarm1_step == Step::Start {
return Poll::Ready(false)
}
assert_eq!(swarm1_step, Step::Dialing);
swarm1_step = Step::Connected
}
Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => {
assert_eq!(new_info, *swarm2.local_peer_id());
assert_eq!(swarm1_step, Step::Connected);
swarm1_step = Step::Replaced
}
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder())
}
Poll::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev),
Poll::Pending => swarm1_not_ready = true
}
}
if rand::random::<f32>() < 0.1 {
match swarm2.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm2_step, Step::Connected);
swarm2_step = Step::Denied
}
Poll::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm1.local_peer_id());
if swarm2_step == Step::Start {
return Poll::Ready(false)
}
assert_eq!(swarm2_step, Step::Dialing);
swarm2_step = Step::Connected
}
Poll::Ready(NetworkEvent::Replaced { new_info, .. }) => {
assert_eq!(new_info, *swarm1.local_peer_id());
assert_eq!(swarm2_step, Step::Connected);
swarm2_step = Step::Replaced
}
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder())
}
Poll::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev),
Poll::Pending => swarm2_not_ready = true
}
}
match (swarm1_step, swarm2_step) {
| (Step::Connected, Step::Replaced)
| (Step::Connected, Step::Denied)
| (Step::Replaced, Step::Connected)
| (Step::Replaced, Step::Denied)
| (Step::Replaced, Step::Replaced)
| (Step::Denied, Step::Connected)
| (Step::Denied, Step::Replaced) => return Poll::Ready(true),
_else => ()
}
if swarm1_not_ready && swarm2_not_ready {
return Poll::Pending
}
}
});
if async_std::task::block_on(future) {
break
}
match swarm1.peer(swarm2.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm1: {:?}", x)
}
match swarm2.peer(swarm1.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm2: {:?}", x)
}
}
}
}