#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod connection;
mod registry;
#[cfg(test)]
mod test;
mod upgrade;
pub mod behaviour;
pub mod dial_opts;
pub mod dummy;
mod executor;
pub mod handler;
pub mod keep_alive;
#[doc(hidden)]
pub mod derive_prelude {
pub use crate::behaviour::AddressChange;
pub use crate::behaviour::ConnectionClosed;
pub use crate::behaviour::ConnectionEstablished;
pub use crate::behaviour::DialFailure;
pub use crate::behaviour::ExpiredExternalAddr;
pub use crate::behaviour::ExpiredListenAddr;
pub use crate::behaviour::FromSwarm;
pub use crate::behaviour::ListenFailure;
pub use crate::behaviour::ListenerClosed;
pub use crate::behaviour::ListenerError;
pub use crate::behaviour::NewExternalAddr;
pub use crate::behaviour::NewListenAddr;
pub use crate::behaviour::NewListener;
pub use crate::ConnectionHandler;
pub use crate::DialError;
pub use crate::IntoConnectionHandler;
pub use crate::IntoConnectionHandlerSelect;
pub use crate::NetworkBehaviour;
pub use crate::NetworkBehaviourAction;
pub use crate::PollParameters;
pub use futures::prelude as futures;
pub use libp2p_core::connection::ConnectionId;
pub use libp2p_core::either::EitherOutput;
pub use libp2p_core::transport::ListenerId;
pub use libp2p_core::ConnectedPoint;
pub use libp2p_core::Multiaddr;
pub use libp2p_core::PeerId;
}
pub use behaviour::{
CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
pub use connection::pool::{ConnectionCounters, ConnectionLimits};
pub use connection::{
ConnectionError, ConnectionLimit, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use executor::Executor;
pub use handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr,
IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler,
OneShotHandlerConfig, SubstreamProtocol,
};
#[cfg(feature = "macros")]
pub use libp2p_swarm_derive::NetworkBehaviour;
pub use registry::{AddAddressResult, AddressRecord, AddressScore};
use connection::pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent};
use connection::IncomingInfo;
use dial_opts::{DialOpts, PeerCondition};
use either::Either;
use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream};
use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::SubstreamBox;
use libp2p_core::{
connection::ConnectedPoint,
multiaddr::Protocol,
multihash::Multihash,
muxing::StreamMuxerBox,
transport::{self, ListenerId, TransportError, TransportEvent},
upgrade::ProtocolName,
Endpoint, Multiaddr, Negotiated, PeerId, Transport,
};
use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec;
use std::collections::{HashMap, HashSet};
use std::iter;
use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
use std::{
convert::TryFrom,
error, fmt, io,
pin::Pin,
task::{Context, Poll},
};
use upgrade::UpgradeInfoSend as _;
pub type NegotiatedSubstream = Negotiated<SubstreamBox>;
type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::OutEvent;
type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
type THandlerInEvent<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
type THandlerOutEvent<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
type THandlerErr<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;
#[derive(Debug)]
pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
Behaviour(TBehaviourOutEvent),
ConnectionEstablished {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: NonZeroU32,
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
},
ConnectionClosed {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: u32,
cause: Option<ConnectionError<THandlerErr>>,
},
IncomingConnection {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
IncomingConnectionError {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
error: PendingInboundConnectionError<io::Error>,
},
OutgoingConnectionError {
peer_id: Option<PeerId>,
error: DialError,
},
BannedPeer {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
NewListenAddr {
listener_id: ListenerId,
address: Multiaddr,
},
ExpiredListenAddr {
listener_id: ListenerId,
address: Multiaddr,
},
ListenerClosed {
listener_id: ListenerId,
addresses: Vec<Multiaddr>,
reason: Result<(), io::Error>,
},
ListenerError {
listener_id: ListenerId,
error: io::Error,
},
Dialing(PeerId),
}
pub struct Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
pool: Pool<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,
local_peer_id: PeerId,
behaviour: TBehaviour,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
external_addrs: Addresses,
banned_peers: HashSet<PeerId>,
banned_peer_connections: HashSet<ConnectionId>,
pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
}
impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
impl<TBehaviour> Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
#[deprecated(
since = "0.41.0",
note = "This constructor is considered ambiguous regarding the executor. Use one of the new, executor-specific constructors or `Swarm::with_threadpool_executor` for the same behaviour."
)]
pub fn new(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_threadpool_executor(transport, behaviour, local_peer_id)
}
pub fn with_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
executor: impl Executor + Send + 'static,
) -> Self {
SwarmBuilder::with_executor(transport, behaviour, local_peer_id, executor).build()
}
#[cfg(all(
feature = "tokio",
not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
))]
pub fn with_tokio_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_executor(
transport,
behaviour,
local_peer_id,
crate::executor::TokioExecutor,
)
}
#[cfg(all(
feature = "async-std",
not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
))]
pub fn with_async_std_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_executor(
transport,
behaviour,
local_peer_id,
crate::executor::AsyncStdExecutor,
)
}
pub fn with_threadpool_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
let builder = match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
{
Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp),
Err(err) => {
log::warn!("Failed to create executor thread pool: {:?}", err);
SwarmBuilder::without_executor(transport, behaviour, local_peer_id)
}
};
builder.build()
}
#[cfg(feature = "wasm-bindgen")]
pub fn with_wasm_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_executor(
transport,
behaviour,
local_peer_id,
crate::executor::WasmBindgenExecutor,
)
}
pub fn without_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build()
}
pub fn network_info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers();
let connection_counters = self.pool.counters().clone();
NetworkInfo {
num_peers,
connection_counters,
}
}
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
let id = self.transport.listen_on(addr)?;
#[allow(deprecated)]
self.behaviour.inject_new_listener(id);
Ok(id)
}
pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
self.transport.remove_listener(listener_id)
}
pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
let handler = self.behaviour.new_handler();
self.dial_with_handler(opts.into(), handler)
}
fn dial_with_handler(
&mut self,
swarm_dial_opts: DialOpts,
handler: <TBehaviour as NetworkBehaviour>::ConnectionHandler,
) -> Result<(), DialError> {
let (peer_id, addresses, dial_concurrency_factor_override, role_override) =
match swarm_dial_opts.0 {
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId {
peer_id,
condition,
role_override,
dial_concurrency_factor_override,
})
| dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses {
peer_id,
condition,
role_override,
dial_concurrency_factor_override,
..
}) => {
let condition_matched = match condition {
PeerCondition::Disconnected => !self.is_connected(&peer_id),
PeerCondition::NotDialing => !self.pool.is_dialing(peer_id),
PeerCondition::Always => true,
};
if !condition_matched {
#[allow(deprecated)]
self.behaviour.inject_dial_failure(
Some(peer_id),
handler,
&DialError::DialPeerConditionFalse(condition),
);
return Err(DialError::DialPeerConditionFalse(condition));
}
if self.banned_peers.contains(&peer_id) {
let error = DialError::Banned;
#[allow(deprecated)]
self.behaviour
.inject_dial_failure(Some(peer_id), handler, &error);
return Err(error);
}
let addresses = {
let mut addresses = match swarm_dial_opts.0 {
dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => {
self.behaviour.addresses_of_peer(&peer_id)
}
dial_opts::Opts::WithPeerIdWithAddresses(
dial_opts::WithPeerIdWithAddresses {
peer_id,
mut addresses,
extend_addresses_through_behaviour,
..
},
) => {
if extend_addresses_through_behaviour {
addresses.extend(self.behaviour.addresses_of_peer(&peer_id))
}
addresses
}
dial_opts::Opts::WithoutPeerIdWithAddress { .. } => {
unreachable!("Due to outer match.")
}
};
let mut unique_addresses = HashSet::new();
addresses.retain(|addr| {
!self.listened_addrs.values().flatten().any(|a| a == addr)
&& unique_addresses.insert(addr.clone())
});
if addresses.is_empty() {
let error = DialError::NoAddresses;
#[allow(deprecated)]
self.behaviour
.inject_dial_failure(Some(peer_id), handler, &error);
return Err(error);
};
addresses
};
(
Some(peer_id),
Either::Left(addresses.into_iter()),
dial_concurrency_factor_override,
role_override,
)
}
dial_opts::Opts::WithoutPeerIdWithAddress(
dial_opts::WithoutPeerIdWithAddress {
address,
role_override,
},
) => {
let peer_id = match address
.iter()
.last()
.and_then(|p| {
if let Protocol::P2p(ma) = p {
Some(PeerId::try_from(ma))
} else {
None
}
})
.transpose()
{
Ok(peer_id) => peer_id,
Err(multihash) => return Err(DialError::InvalidPeerId(multihash)),
};
(
peer_id,
Either::Right(iter::once(address)),
None,
role_override,
)
}
};
let dials = addresses
.map(|a| match p2p_addr(peer_id, a) {
Ok(address) => {
let dial = match role_override {
Endpoint::Dialer => self.transport.dial(address.clone()),
Endpoint::Listener => self.transport.dial_as_listener(address.clone()),
};
match dial {
Ok(fut) => fut
.map(|r| (address, r.map_err(TransportError::Other)))
.boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(),
}
}
Err(address) => futures::future::ready((
address.clone(),
Err(TransportError::MultiaddrNotSupported(address)),
))
.boxed(),
})
.collect();
match self.pool.add_outgoing(
dials,
peer_id,
handler,
role_override,
dial_concurrency_factor_override,
) {
Ok(_connection_id) => Ok(()),
Err((connection_limit, handler)) => {
let error = DialError::ConnectionLimit(connection_limit);
#[allow(deprecated)]
self.behaviour.inject_dial_failure(peer_id, handler, &error);
Err(error)
}
}
}
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.listened_addrs.values().flatten()
}
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
pub fn external_addresses(&self) -> impl Iterator<Item = &AddressRecord> {
self.external_addrs.iter()
}
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
let result = self.external_addrs.add(a.clone(), s);
let expired = match &result {
AddAddressResult::Inserted { expired } => {
#[allow(deprecated)]
self.behaviour.inject_new_external_addr(&a);
expired
}
AddAddressResult::Updated { expired } => expired,
};
for a in expired {
#[allow(deprecated)]
self.behaviour.inject_expired_external_addr(&a.addr);
}
result
}
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
if self.external_addrs.remove(addr) {
#[allow(deprecated)]
self.behaviour.inject_expired_external_addr(addr);
true
} else {
false
}
}
pub fn ban_peer_id(&mut self, peer_id: PeerId) {
if self.banned_peers.insert(peer_id) {
self.pool.disconnect(peer_id);
}
}
pub fn unban_peer_id(&mut self, peer_id: PeerId) {
self.banned_peers.remove(&peer_id);
}
#[allow(clippy::result_unit_err)]
pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
let was_connected = self.pool.is_connected(peer_id);
self.pool.disconnect(peer_id);
if was_connected {
Ok(())
} else {
Err(())
}
}
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.pool.is_connected(*peer_id)
}
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool.iter_connected()
}
pub fn behaviour(&self) -> &TBehaviour {
&self.behaviour
}
pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
&mut self.behaviour
}
fn handle_pool_event(
&mut self,
event: PoolEvent<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
PoolEvent::ConnectionEstablished {
peer_id,
id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
} => {
if self.banned_peers.contains(&peer_id) {
self.banned_peer_connections.insert(id);
self.pool.disconnect(peer_id);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
#[allow(deprecated)]
self.behaviour.inject_connection_established(
&peer_id,
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
);
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
});
}
}
PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
} => {
let error = error.into();
#[allow(deprecated)]
self.behaviour.inject_dial_failure(peer, handler, &error);
if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
} else {
log::debug!("Connection attempt to unknown peer failed with {:?}", error);
}
return Some(SwarmEvent::OutgoingConnectionError {
peer_id: peer,
error,
});
}
PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
} => {
log::debug!("Incoming connection failed: {:?}", error);
#[allow(deprecated)]
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
return Some(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
}
PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
} => {
if let Some(error) = error.as_ref() {
log::debug!(
"Connection closed with error {:?}: {:?}; Total (peer): {}.",
error,
connected,
remaining_established_connection_ids.len()
);
} else {
log::debug!(
"Connection closed: {:?}; Total (peer): {}.",
connected,
remaining_established_connection_ids.len()
);
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !self.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
#[allow(deprecated)]
self.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler,
remaining_non_banned,
);
}
return Some(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause: error,
num_established,
});
}
PoolEvent::ConnectionEvent { peer_id, id, event } => {
if self.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
#[allow(deprecated)]
self.behaviour.inject_event(peer_id, id, event);
}
}
PoolEvent::AddressChange {
peer_id,
id,
new_endpoint,
old_endpoint,
} => {
if !self.banned_peer_connections.contains(&id) {
#[allow(deprecated)]
self.behaviour.inject_address_change(
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
}
}
}
None
}
fn handle_transport_event(
&mut self,
event: TransportEvent<
<transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
io::Error,
>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
TransportEvent::Incoming {
listener_id: _,
upgrade,
local_addr,
send_back_addr,
} => {
let handler = self.behaviour.new_handler();
match self.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
) {
Ok(_connection_id) => {
return Some(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
#[allow(deprecated)]
self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler);
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
};
}
TransportEvent::NewAddress {
listener_id,
listen_addr,
} => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
let addrs = self.listened_addrs.entry(listener_id).or_default();
if !addrs.contains(&listen_addr) {
addrs.push(listen_addr.clone())
}
#[allow(deprecated)]
self.behaviour
.inject_new_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::NewListenAddr {
listener_id,
address: listen_addr,
});
}
TransportEvent::AddressExpired {
listener_id,
listen_addr,
} => {
log::debug!(
"Listener {:?}; Expired address {:?}.",
listener_id,
listen_addr
);
if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
addrs.retain(|a| a != &listen_addr);
}
#[allow(deprecated)]
self.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr);
return Some(SwarmEvent::ExpiredListenAddr {
listener_id,
address: listen_addr,
});
}
TransportEvent::ListenerClosed {
listener_id,
reason,
} => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
for addr in addrs.iter() {
#[allow(deprecated)]
self.behaviour.inject_expired_listen_addr(listener_id, addr);
}
#[allow(deprecated)]
self.behaviour.inject_listener_closed(
listener_id,
match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
},
);
return Some(SwarmEvent::ListenerClosed {
listener_id,
addresses: addrs.to_vec(),
reason,
});
}
TransportEvent::ListenerError { listener_id, error } => {
#[allow(deprecated)]
self.behaviour.inject_listener_error(listener_id, &error);
return Some(SwarmEvent::ListenerError { listener_id, error });
}
}
None
}
fn handle_behaviour_event(
&mut self,
event: NetworkBehaviourAction<TBehaviour::OutEvent, TBehaviour::ConnectionHandler>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
NetworkBehaviourAction::GenerateEvent(event) => {
return Some(SwarmEvent::Behaviour(event))
}
NetworkBehaviourAction::Dial { opts, handler } => {
let peer_id = opts.get_peer_id();
if let Ok(()) = self.dial_with_handler(opts, handler) {
if let Some(peer_id) = peer_id {
return Some(SwarmEvent::Dialing(peer_id));
}
}
}
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event,
} => {
assert!(self.pending_event.is_none());
let handler = match handler {
NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
NotifyHandler::Any => {
let ids = self
.pool
.iter_established_connections_of_peer(&peer_id)
.collect();
PendingNotifyHandler::Any(ids)
}
};
self.pending_event = Some((peer_id, handler, event));
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
let translated_addresses = {
let mut addrs: Vec<_> = self
.listened_addrs
.values()
.flatten()
.filter_map(|server| self.transport.address_translation(server, &address))
.collect();
addrs.sort_unstable();
addrs.dedup();
addrs
};
for addr in translated_addresses {
self.add_external_address(addr, score);
}
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = self.pool.get_established(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
self.pool.disconnect(peer_id);
}
},
}
None
}
fn poll_next_event(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
let this = &mut *self;
loop {
match this.pending_event.take() {
Some((peer_id, handler, event)) => match handler {
PendingNotifyHandler::One(conn_id) => {
match this.pool.get_established(conn_id) {
Some(conn) => match notify_one(conn, event, cx) {
None => continue,
Some(event) => {
this.pending_event = Some((peer_id, handler, event));
}
},
None => continue,
}
}
PendingNotifyHandler::Any(ids) => {
match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) {
None => continue,
Some((event, ids)) => {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
}
}
}
},
None => {
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &this.local_peer_id,
supported_protocols: &this.supported_protocols,
listened_addrs: this.listened_addrs.values().flatten().collect(),
external_addrs: &this.external_addrs,
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending => {}
Poll::Ready(behaviour_event) => {
if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event)
{
return Poll::Ready(swarm_event);
}
continue;
}
}
}
}
match this.pool.poll(cx) {
Poll::Pending => {}
Poll::Ready(pool_event) => {
if let Some(swarm_event) = this.handle_pool_event(pool_event) {
return Poll::Ready(swarm_event);
}
continue;
}
};
match Pin::new(&mut this.transport).poll(cx) {
Poll::Pending => {}
Poll::Ready(transport_event) => {
if let Some(swarm_event) = this.handle_transport_event(transport_event) {
return Poll::Ready(swarm_event);
}
continue;
}
}
return Poll::Pending;
}
}
}
enum PendingNotifyHandler {
One(ConnectionId),
Any(SmallVec<[ConnectionId; 10]>),
}
fn notify_one<THandlerInEvent>(
conn: &mut EstablishedConnection<THandlerInEvent>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<THandlerInEvent> {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event);
None
}
}
}
fn notify_any<TTrans, THandler, TBehaviour>(
ids: SmallVec<[ConnectionId; 10]>,
pool: &mut Pool<THandler, TTrans>,
event: THandlerInEvent<TBehaviour>,
cx: &mut Context<'_>,
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
TBehaviour: NetworkBehaviour,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<
InEvent = THandlerInEvent<TBehaviour>,
OutEvent = THandlerOutEvent<TBehaviour>,
>,
{
let mut pending = SmallVec::new();
let mut event = Some(event); for id in ids.into_iter() {
if let Some(conn) = pool.get_established(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
let e = event.take().expect("by (1),(2)");
if let Err(e) = conn.notify_handler(e) {
event = Some(e) } else {
break;
}
}
}
}
}
event.and_then(|e| {
if !pending.is_empty() {
Some((e, pending))
} else {
None
}
})
}
impl<TBehaviour> Stream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut().poll_next_event(cx).map(Some)
}
}
impl<TBehaviour> FusedStream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
fn is_terminated(&self) -> bool {
false
}
}
pub struct SwarmPollParameters<'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: Vec<&'a Multiaddr>,
external_addrs: &'a Addresses,
}
impl<'a> PollParameters for SwarmPollParameters<'a> {
type SupportedProtocolsIter = std::iter::Cloned<std::slice::Iter<'a, std::vec::Vec<u8>>>;
type ListenedAddressesIter = std::iter::Cloned<std::vec::IntoIter<&'a Multiaddr>>;
type ExternalAddressesIter = AddressIntoIter;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
self.supported_protocols.iter().cloned()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
self.listened_addrs.clone().into_iter().cloned()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
self.external_addrs.clone().into_iter()
}
fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}
pub struct SwarmBuilder<TBehaviour> {
local_peer_id: PeerId,
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
pool_config: PoolConfig,
connection_limits: ConnectionLimits,
}
impl<TBehaviour> SwarmBuilder<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
#[deprecated(
since = "0.41.0",
note = "Use `SwarmBuilder::with_executor` or `SwarmBuilder::without_executor` instead."
)]
pub fn new(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
let executor: Option<Box<dyn Executor + Send>> = match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
.ok()
{
Some(tp) => Some(Box::new(tp)),
None => None,
};
SwarmBuilder {
local_peer_id,
transport,
behaviour,
pool_config: PoolConfig::new(executor),
connection_limits: Default::default(),
}
}
pub fn with_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
executor: impl Executor + Send + 'static,
) -> Self {
Self {
local_peer_id,
transport,
behaviour,
pool_config: PoolConfig::new(Some(Box::new(executor))),
connection_limits: Default::default(),
}
}
#[cfg(all(
feature = "tokio",
not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
))]
pub fn with_tokio_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_executor(
transport,
behaviour,
local_peer_id,
crate::executor::TokioExecutor,
)
}
#[cfg(all(
feature = "async-std",
not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
))]
pub fn with_async_std_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self::with_executor(
transport,
behaviour,
local_peer_id,
crate::executor::AsyncStdExecutor,
)
}
pub fn without_executor(
transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
behaviour: TBehaviour,
local_peer_id: PeerId,
) -> Self {
Self {
local_peer_id,
transport,
behaviour,
pool_config: PoolConfig::new(None),
connection_limits: Default::default(),
}
}
#[deprecated(since = "0.41.0", note = "Use `SwarmBuilder::with_executor` instead.")]
pub fn executor(mut self, executor: Box<dyn Executor + Send>) -> Self {
self.pool_config = self.pool_config.with_executor(executor);
self
}
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
self
}
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.pool_config = self.pool_config.with_connection_event_buffer_size(n);
self
}
pub fn dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
self
}
pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.connection_limits = limits;
self
}
pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
self
}
pub fn max_negotiating_inbound_streams(mut self, v: usize) -> Self {
self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
self
}
pub fn build(mut self) -> Swarm<TBehaviour> {
let supported_protocols = self
.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
Swarm {
local_peer_id: self.local_peer_id,
transport: self.transport,
pool: Pool::new(self.local_peer_id, self.pool_config, self.connection_limits),
behaviour: self.behaviour,
supported_protocols,
listened_addrs: HashMap::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
banned_peer_connections: HashSet::new(),
pending_event: None,
}
}
}
#[derive(Debug)]
pub enum DialError {
Banned,
ConnectionLimit(ConnectionLimit),
LocalPeerId,
NoAddresses,
DialPeerConditionFalse(dial_opts::PeerCondition),
Aborted,
InvalidPeerId(Multihash),
WrongPeerId {
obtained: PeerId,
endpoint: ConnectedPoint,
},
ConnectionIo(io::Error),
Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
}
impl From<PendingOutboundConnectionError<io::Error>> for DialError {
fn from(error: PendingOutboundConnectionError<io::Error>) -> Self {
match error {
PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
PendingConnectionError::Aborted => DialError::Aborted,
PendingConnectionError::WrongPeerId { obtained, endpoint } => {
DialError::WrongPeerId { obtained, endpoint }
}
PendingConnectionError::IO(e) => DialError::ConnectionIo(e),
PendingConnectionError::Transport(e) => DialError::Transport(e),
}
}
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."),
DialError::Banned => write!(f, "Dial error: peer is banned."),
DialError::DialPeerConditionFalse(c) => {
write!(
f,
"Dial error: condition {:?} for dialing peer was false.",
c
)
}
DialError::Aborted => write!(
f,
"Dial error: Pending connection attempt has been aborted."
),
DialError::InvalidPeerId(multihash) => {
write!(f, "Dial error: multihash {:?} is not a PeerId", multihash)
}
DialError::WrongPeerId { obtained, endpoint } => write!(
f,
"Dial error: Unexpected peer ID {} at {:?}.",
obtained, endpoint
),
DialError::ConnectionIo(e) => write!(
f,
"Dial error: An I/O error occurred on the connection: {:?}.",
e
),
DialError::Transport(errors) => {
write!(f, "Failed to negotiate transport protocol(s): [")?;
for (addr, error) in errors {
write!(f, "({addr}")?;
print_error_chain(f, error)?;
write!(f, ")")?;
}
write!(f, "]")?;
Ok(())
}
}
}
}
fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
write!(f, ": {e}")?;
if let Some(source) = e.source() {
print_error_chain(f, source)?;
}
Ok(())
}
impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::LocalPeerId => None,
DialError::NoAddresses => None,
DialError::Banned => None,
DialError::DialPeerConditionFalse(_) => None,
DialError::Aborted => None,
DialError::InvalidPeerId { .. } => None,
DialError::WrongPeerId { .. } => None,
DialError::ConnectionIo(_) => None,
DialError::Transport(_) => None,
}
}
}
#[derive(Clone, Debug)]
pub struct NetworkInfo {
num_peers: usize,
connection_counters: ConnectionCounters,
}
impl NetworkInfo {
pub fn num_peers(&self) -> usize {
self.num_peers
}
pub fn connection_counters(&self) -> &ConnectionCounters {
&self.connection_counters
}
}
fn p2p_addr(peer: Option<PeerId>, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
let peer = match peer {
Some(p) => p,
None => return Ok(addr),
};
if let Some(Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr);
}
Ok(addr)
} else {
Ok(addr.with(Protocol::P2p(peer.into())))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::{CallTraceBehaviour, MockBehaviour};
use futures::executor::block_on;
use futures::executor::ThreadPool;
use futures::future::poll_fn;
use futures::future::Either;
use futures::{executor, future, ready};
use libp2p_core::either::EitherError;
use libp2p_core::multiaddr::multiaddr;
use libp2p_core::transport::memory::MemoryTransportError;
use libp2p_core::transport::TransportEvent;
use libp2p_core::{identity, multiaddr, transport, upgrade};
use libp2p_core::{Endpoint, UpgradeError};
use libp2p_plaintext as plaintext;
use libp2p_yamux as yamux;
use quickcheck::*;
use void::Void;
enum State {
Connecting,
Disconnecting,
}
fn new_test_swarm<T, O>(
handler_proto: T,
) -> SwarmBuilder<CallTraceBehaviour<MockBehaviour<T, O>>>
where
T: ConnectionHandler + Clone,
T::OutEvent: Clone,
O: Send + 'static,
{
let id_keys = identity::Keypair::generate_ed25519();
let local_public_key = id_keys.public();
let transport = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(plaintext::PlainText2Config {
local_public_key: local_public_key.clone(),
})
.multiplex(yamux::YamuxConfig::default())
.boxed();
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
match ThreadPool::new().ok() {
Some(tp) => {
SwarmBuilder::with_executor(transport, behaviour, local_public_key.into(), tp)
}
None => SwarmBuilder::without_executor(transport, behaviour, local_public_key.into()),
}
}
fn swarms_connected<TBehaviour>(
swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
num_connections: usize,
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone,
{
swarm1
.behaviour()
.num_connections_to_peer(*swarm2.local_peer_id())
== num_connections
&& swarm2
.behaviour()
.num_connections_to_peer(*swarm1.local_peer_id())
== num_connections
&& swarm1.is_connected(swarm2.local_peer_id())
&& swarm2.is_connected(swarm1.local_peer_id())
}
fn swarms_disconnected<TBehaviour: NetworkBehaviour>(
swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone
{
swarm1
.behaviour()
.num_connections_to_peer(*swarm2.local_peer_id())
== 0
&& swarm2
.behaviour()
.num_connections_to_peer(*swarm1.local_peer_id())
== 0
&& !swarm1.is_connected(swarm2.local_peer_id())
&& !swarm2.is_connected(swarm1.local_peer_id())
}
#[test]
fn test_connect_disconnect_ban() {
let handler_proto = keep_alive::ConnectionHandler;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
enum Stage {
Connecting,
Banned,
BannedDial,
Unbanned,
Reconnecting,
}
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
let mut s1_expected_conns = num_connections;
let mut s2_expected_conns = num_connections;
let mut stage = Stage::Connecting;
executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match stage {
Stage::Connecting => {
if swarm1.behaviour.assert_connected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_connected(s2_expected_conns, 1)
{
swarm2.ban_peer_id(swarm1_id);
stage = Stage::Banned;
}
}
Stage::Banned => {
if swarm1.behaviour.assert_disconnected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_disconnected(s2_expected_conns, 1)
{
swarm1.dial(addr2.clone()).unwrap();
s1_expected_conns += 1;
stage = Stage::BannedDial;
}
}
Stage::BannedDial => {
if swarm2.network_info().num_peers() == 1 {
assert_eq!(
swarm2.behaviour.on_connection_established.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
swarm2.unban_peer_id(swarm1_id);
stage = Stage::Unbanned;
}
}
Stage::Unbanned => {
if swarm2.network_info().num_peers() == 0 {
assert_eq!(
swarm2.behaviour.on_connection_closed.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
assert!(swarm2.banned_peer_connections.is_empty());
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
s1_expected_conns += num_connections;
s2_expected_conns += num_connections;
stage = Stage::Reconnecting;
}
}
Stage::Reconnecting => {
if swarm1.behaviour.on_connection_established.len() == s1_expected_conns
&& swarm2.behaviour.assert_connected(s2_expected_conns, 2)
{
return Poll::Ready(());
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}
#[test]
fn test_swarm_disconnect() {
let handler_proto = keep_alive::ConnectionHandler;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let mut reconnected = false;
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(());
}
swarm2
.disconnect_peer_id(swarm1_id)
.expect("Error disconnecting");
state = State::Disconnecting;
}
}
State::Disconnecting => {
if swarms_disconnected(&swarm1, &swarm2) {
if reconnected {
return Poll::Ready(());
}
reconnected = true;
for _ in 0..num_connections {
swarm2.dial(addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}
#[test]
fn test_behaviour_disconnect_all() {
let handler_proto = keep_alive::ConnectionHandler;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let mut reconnected = false;
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(());
}
swarm2.behaviour.inner().next_action.replace(
NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id,
connection: CloseConnection::All,
},
);
state = State::Disconnecting;
continue;
}
}
State::Disconnecting => {
if swarms_disconnected(&swarm1, &swarm2) {
reconnected = true;
for _ in 0..num_connections {
swarm2.dial(addr1.clone()).unwrap();
}
state = State::Connecting;
continue;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}
#[test]
fn test_behaviour_disconnect_one() {
let handler_proto = keep_alive::ConnectionHandler;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
let mut disconnected_conn_id = None;
executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
disconnected_conn_id = {
let conn_id =
swarm2.behaviour.on_connection_established[num_connections / 2].1;
swarm2.behaviour.inner().next_action.replace(
NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id,
connection: CloseConnection::One(conn_id),
},
);
Some(conn_id)
};
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
assert!(s
.behaviour
.on_connection_closed
.iter()
.all(|(.., remaining_conns)| *remaining_conns > 0));
assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
s.behaviour.assert_connected(num_connections, 1);
}
if [&swarm1, &swarm2]
.iter()
.all(|s| s.behaviour.on_connection_closed.len() == 1)
{
let conn_id = swarm2.behaviour.on_connection_closed[0].1;
assert_eq!(Some(conn_id), disconnected_conn_id);
return Poll::Ready(());
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}
#[test]
fn concurrent_dialing() {
#[derive(Clone, Debug)]
struct DialConcurrencyFactor(NonZeroU8);
impl Arbitrary for DialConcurrencyFactor {
fn arbitrary(g: &mut Gen) -> Self {
Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
}
}
fn prop(concurrency_factor: DialConcurrencyFactor) {
block_on(async {
let mut swarm = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.dial_concurrency_factor(concurrency_factor.0)
.build();
let num_listen_addrs = concurrency_factor.0.get() + 2;
let mut listen_addresses = Vec::new();
let mut transports = Vec::new();
for _ in 0..num_listen_addrs {
let mut transport = transport::MemoryTransport::default().boxed();
transport.listen_on("/memory/0".parse().unwrap()).unwrap();
match transport.select_next_some().await {
TransportEvent::NewAddress { listen_addr, .. } => {
listen_addresses.push(listen_addr);
}
_ => panic!("Expected `NewListenAddr` event."),
}
transports.push(transport);
}
swarm
.dial(
DialOpts::peer_id(PeerId::random())
.addresses(listen_addresses)
.build(),
)
.unwrap();
for mut transport in transports.into_iter() {
loop {
match futures::future::select(transport.select_next_some(), swarm.next())
.await
{
Either::Left((TransportEvent::Incoming { .. }, _)) => {
break;
}
Either::Left(_) => {
panic!("Unexpected transport event.")
}
Either::Right((e, _)) => {
panic!("Expect swarm to not emit any event {:?}", e)
}
}
}
}
match swarm.next().await.unwrap() {
SwarmEvent::OutgoingConnectionError { .. } => {}
e => panic!("Unexpected swarm event {:?}", e),
}
})
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
}
#[test]
fn max_outgoing() {
use rand::Rng;
let outgoing_limit = rand::thread_rng().gen_range(1..10);
let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit));
let mut network = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits)
.build();
let addr: Multiaddr = "/memory/1234".parse().unwrap();
let target = PeerId::random();
for _ in 0..outgoing_limit {
network
.dial(
DialOpts::peer_id(target)
.addresses(vec![addr.clone()])
.build(),
)
.expect("Unexpected connection limit.");
}
match network
.dial(DialOpts::peer_id(target).addresses(vec![addr]).build())
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(limit) => {
assert_eq!(limit.current, outgoing_limit);
assert_eq!(limit.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}
let info = network.network_info();
assert_eq!(info.num_peers(), 0);
assert_eq!(
info.connection_counters().num_pending_outgoing(),
outgoing_limit
);
}
#[test]
fn max_established_incoming() {
#[derive(Debug, Clone)]
struct Limit(u32);
impl Arbitrary for Limit {
fn arbitrary(g: &mut Gen) -> Self {
Self(g.gen_range(1..10))
}
}
fn limits(limit: u32) -> ConnectionLimits {
ConnectionLimits::default().with_max_established_incoming(Some(limit))
}
fn prop(limit: Limit) {
let limit = limit.0;
let mut network1 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits(limit))
.build();
let mut network2 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler)
.connection_limits(limits(limit))
.build();
let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap();
let listen_addr = async_std::task::block_on(poll_fn(|cx| {
match ready!(network1.poll_next_unpin(cx)).unwrap() {
SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address),
e => panic!("Unexpected network event: {:?}", e),
}
}));
async_std::task::block_on({
let mut n = 0;
network2.dial(listen_addr.clone()).unwrap();
let mut expected_closed = false;
let mut network_1_established = false;
let mut network_2_established = false;
let mut network_1_limit_reached = false;
let mut network_2_limit_reached = false;
poll_fn(move |cx| {
loop {
let mut network_1_pending = false;
let mut network_2_pending = false;
match network1.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {}
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
network_1_established = true;
}
Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
error: PendingConnectionError::ConnectionLimit(err),
..
})) => {
assert_eq!(err.limit, limit);
assert_eq!(err.limit, err.current);
let info = network1.network_info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_incoming(), limit);
assert_eq!(counters.num_established(), limit);
network_1_limit_reached = true;
}
Poll::Pending => {
network_1_pending = true;
}
e => panic!("Unexpected network event: {:?}", e),
}
match network2.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
network_2_established = true;
}
Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => {
assert!(expected_closed);
let info = network2.network_info();
let counters = info.connection_counters();
assert_eq!(counters.num_established_outgoing(), limit);
assert_eq!(counters.num_established(), limit);
network_2_limit_reached = true;
}
Poll::Pending => {
network_2_pending = true;
}
e => panic!("Unexpected network event: {:?}", e),
}
if network_1_pending && network_2_pending {
return Poll::Pending;
}
if network_1_established && network_2_established {
network_1_established = false;
network_2_established = false;
if n <= limit {
n += 1;
network2.dial(listen_addr.clone()).unwrap();
if n == limit {
expected_closed = true;
}
} else {
panic!("Expect networks not to establish connections beyond the limit.")
}
}
if network_1_limit_reached && network_2_limit_reached {
return Poll::Ready(());
}
}
})
});
}
quickcheck(prop as fn(_));
}
#[test]
fn invalid_peer_id() {
let mut swarm1 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
let mut swarm2 = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
let address =
futures::executor::block_on(future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
Poll::Ready(address)
}
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}));
let other_id = PeerId::random();
let other_addr = address.with(Protocol::P2p(other_id.into()));
swarm2.dial(other_addr.clone()).unwrap();
let (peer_id, error) = futures::executor::block_on(future::poll_fn(|cx| {
if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
swarm1.poll_next_unpin(cx)
{}
match swarm2.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
peer_id, error, ..
})) => Poll::Ready((peer_id, error)),
Poll::Ready(x) => panic!("unexpected {:?}", x),
Poll::Pending => Poll::Pending,
}
}));
assert_eq!(peer_id.unwrap(), other_id);
match error {
DialError::WrongPeerId { obtained, endpoint } => {
assert_eq!(obtained, *swarm1.local_peer_id());
assert_eq!(
endpoint,
ConnectedPoint::Dialer {
address: other_addr,
role_override: Endpoint::Dialer,
}
);
}
x => panic!("wrong error {:?}", x),
}
}
#[test]
fn dial_self() {
let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
let local_address =
futures::executor::block_on(future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
Poll::Ready(address)
}
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}));
swarm.dial(local_address.clone()).unwrap();
let mut got_dial_err = false;
let mut got_inc_err = false;
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
peer_id,
error: DialError::WrongPeerId { .. },
..
})) => {
assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
assert!(!got_dial_err);
got_dial_err = true;
if got_inc_err {
return Poll::Ready(Ok(()));
}
}
Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
local_addr, ..
})) => {
assert!(!got_inc_err);
assert_eq!(local_addr, local_address);
got_inc_err = true;
if got_dial_err {
return Poll::Ready(Ok(()));
}
}
Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
assert_eq!(local_addr, local_address);
}
Poll::Ready(ev) => {
panic!("Unexpected event: {:?}", ev)
}
Poll::Pending => break Poll::Pending,
}
}
}))
.unwrap();
}
#[test]
fn dial_self_by_id() {
let swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
let peer_id = *swarm.local_peer_id();
assert!(!swarm.is_connected(&peer_id));
}
#[async_std::test]
async fn multiple_addresses_err() {
let target = PeerId::random();
let mut swarm = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
let addresses = HashSet::from([
multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
multiaddr![Udp(rand::random::<u16>())],
multiaddr![Udp(rand::random::<u16>())],
multiaddr![Udp(rand::random::<u16>())],
multiaddr![Udp(rand::random::<u16>())],
multiaddr![Udp(rand::random::<u16>())],
]);
swarm
.dial(
DialOpts::peer_id(target)
.addresses(addresses.iter().cloned().collect())
.build(),
)
.unwrap();
match swarm.next().await.unwrap() {
SwarmEvent::OutgoingConnectionError {
peer_id,
error: DialError::Transport(errors),
} => {
assert_eq!(target, peer_id.unwrap());
let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
let expected_addresses = addresses
.into_iter()
.map(|addr| addr.with(Protocol::P2p(target.into())))
.collect::<Vec<_>>();
assert_eq!(expected_addresses, failed_addresses);
}
e => panic!("Unexpected event: {e:?}"),
}
}
#[test]
fn aborting_pending_connection_surfaces_error() {
let _ = env_logger::try_init();
let mut dialer = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
let mut listener = new_test_swarm::<_, ()>(dummy::ConnectionHandler).build();
let listener_peer_id = *listener.local_peer_id();
listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
let listener_address = match block_on(listener.next()).unwrap() {
SwarmEvent::NewListenAddr { address, .. } => address,
e => panic!("Unexpected network event: {:?}", e),
};
dialer
.dial(
DialOpts::peer_id(listener_peer_id)
.addresses(vec![listener_address])
.build(),
)
.unwrap();
dialer
.disconnect_peer_id(listener_peer_id)
.expect_err("Expect peer to not yet be connected.");
match block_on(dialer.next()).unwrap() {
SwarmEvent::OutgoingConnectionError {
error: DialError::Aborted,
..
} => {}
e => panic!("Unexpected swarm event {:?}.", e),
}
}
#[test]
fn dial_error_prints_sources() {
let error = DialError::Transport(vec![(
"/ip4/127.0.0.1/tcp/80".parse().unwrap(),
TransportError::Other(io::Error::new(
io::ErrorKind::Other,
EitherError::<_, Void>::A(EitherError::<Void, _>::B(UpgradeError::Apply(
MemoryTransportError::Unreachable,
))),
)),
)]);
let string = format!("{error}");
assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : Handshake failed: No listener on the given port.)]", string)
}
}