mod util;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::multiaddr::multiaddr;
use libp2p_core::nodes::network::{Network, NetworkEvent, NetworkReachError, PeerState, UnknownPeerDialErr, IncomingError};
use libp2p_core::{muxing::StreamMuxerBox, PeerId, Transport, upgrade};
use libp2p_swarm::{
NegotiatedSubstream,
ProtocolsHandler,
KeepAlive,
SubstreamProtocol,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
protocols_handler::NodeHandlerWrapperBuilder
};
use rand::seq::SliceRandom;
use std::{io, task::Context, task::Poll};
#[derive(Default)]
struct TestHandler;
impl ProtocolsHandler for TestHandler {
type InEvent = (); type OutEvent = (); type Error = io::Error;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = upgrade::DeniedUpgrade;
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(upgrade::DeniedUpgrade)
}
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output
) { panic!() }
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) { panic!() }
fn inject_event(&mut self, _: Self::InEvent) {
panic!()
}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Error>) {
}
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No }
fn poll(&mut self, _: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
Poll::Pending
}
}
#[test]
fn deny_incoming_connec() {
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>, _, _> = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
let mut swarm2 = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let address = async_std::task::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
}
}));
swarm2
.peer(swarm1.local_peer_id().clone())
.into_not_connected().unwrap()
.connect(address.clone(), TestHandler::default().into_node_handler_builder());
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc),
Poll::Ready(_) => unreachable!(),
Poll::Pending => (),
}
match swarm2.poll(cx) {
Poll::Ready(NetworkEvent::DialError {
new_state: PeerState::NotConnected,
peer_id,
multiaddr,
error: NetworkReachError::Transport(_)
}) => {
assert_eq!(peer_id, *swarm1.local_peer_id());
assert_eq!(multiaddr, address);
return Poll::Ready(Ok(()));
},
Poll::Ready(_) => unreachable!(),
Poll::Pending => (),
}
Poll::Pending
})).unwrap();
}
#[test]
fn dial_self() {
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
})
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (address, mut swarm) = async_std::task::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
} else {
panic!("Was expecting the listen address to be reported")
}
}))
.unwrap();
swarm.dial(address.clone(), TestHandler::default().into_node_handler_builder()).unwrap();
let mut got_dial_err = false;
let mut got_inc_err = false;
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::UnknownPeerDialError {
multiaddr,
error: UnknownPeerDialErr::FoundLocalPeerId,
handler: _
}) => {
assert_eq!(multiaddr, address);
assert!(!got_dial_err);
got_dial_err = true;
if got_inc_err {
return Poll::Ready(Ok(()));
}
},
Poll::Ready(NetworkEvent::IncomingConnectionError {
local_addr,
send_back_addr: _,
error: IncomingError::FoundLocalPeerId
}) => {
assert_eq!(address, local_addr);
assert!(!got_inc_err);
got_inc_err = true;
if got_dial_err {
return Poll::Ready(Ok(()));
}
},
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
assert_eq!(*inc.local_addr(), address);
inc.accept(TestHandler::default().into_node_handler_builder());
},
Poll::Ready(ev) => {
panic!("Unexpected event: {:?}", ev)
}
Poll::Pending => break Poll::Pending,
}
}
})).unwrap();
}
#[test]
fn dial_self_by_id() {
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>, _, _> = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
let peer_id = swarm.local_peer_id().clone();
assert!(swarm.peer(peer_id).into_not_connected().is_none());
}
#[test]
fn multiple_addresses_err() {
let mut swarm = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
let mut addresses = Vec::new();
for _ in 0 .. 3 {
addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
}
for _ in 0 .. 5 {
addresses.push(multiaddr![Udp(rand::random::<u16>())]);
}
addresses.shuffle(&mut rand::thread_rng());
let target = PeerId::random();
swarm.peer(target.clone())
.into_not_connected().unwrap()
.connect_iter(addresses.clone(), TestHandler::default().into_node_handler_builder())
.unwrap();
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::DialError {
new_state,
peer_id,
multiaddr,
error: NetworkReachError::Transport(_)
}) => {
assert_eq!(peer_id, target);
let expected = addresses.remove(0);
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(new_state, PeerState::NotConnected);
return Poll::Ready(Ok(()));
} else {
match new_state {
PeerState::Dialing { num_pending_addresses } => {
assert_eq!(num_pending_addresses.get(), addresses.len());
},
_ => panic!()
}
}
},
Poll::Ready(_) => unreachable!(),
Poll::Pending => break Poll::Pending,
}
}
})).unwrap();
}