mod behaviour;
mod registry;
mod upgrade;
pub mod protocols_handler;
pub mod toggle;
pub use behaviour::{
NetworkBehaviour,
NetworkBehaviourAction,
NetworkBehaviourEventProcess,
PollParameters,
NotifyHandler,
DialPeerCondition
};
pub use protocols_handler::{
IntoProtocolsHandler,
IntoProtocolsHandlerSelect,
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerSelect,
ProtocolsHandlerUpgrErr,
OneShotHandler,
OneShotHandlerConfig,
SubstreamProtocol
};
use protocols_handler::{
NodeHandlerWrapperBuilder,
NodeHandlerWrapperError,
};
use futures::{
prelude::*,
executor::{ThreadPool, ThreadPoolBuilder},
stream::FusedStream,
};
use libp2p_core::{
Executor,
Transport,
Multiaddr,
Negotiated,
PeerId,
connection::{
ConnectionError,
ConnectionId,
ConnectionInfo,
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
IntoConnectionHandler,
ListenerId,
PendingConnectionError,
Substream
},
transport::{TransportError, boxed::Boxed as BoxTransport},
muxing::{StreamMuxer, StreamMuxerBox},
network::{
Network,
NetworkInfo,
NetworkEvent,
NetworkConfig,
peer::ConnectedPeer,
},
upgrade::ProtocolName,
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, hash::Hash, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;
pub type Swarm<TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
TConnInfo,
>;
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
#[derive(Debug)]
pub enum SwarmEvent<TBvEv, THandleErr> {
Behaviour(TBvEv),
ConnectionEstablished {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: NonZeroU32,
},
ConnectionClosed {
peer_id: PeerId,
endpoint: ConnectedPoint,
num_established: u32,
cause: ConnectionError<NodeHandlerWrapperError<THandleErr>>,
},
IncomingConnection {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
},
IncomingConnectionError {
local_addr: Multiaddr,
send_back_addr: Multiaddr,
error: PendingConnectionError<io::Error>,
},
BannedPeer {
peer_id: PeerId,
endpoint: ConnectedPoint,
},
UnreachableAddr {
peer_id: PeerId,
address: Multiaddr,
error: PendingConnectionError<io::Error>,
attempts_remaining: u32,
},
UnknownPeerUnreachableAddr {
address: Multiaddr,
error: PendingConnectionError<io::Error>,
},
NewListenAddr(Multiaddr),
ExpiredListenAddr(Multiaddr),
ListenerClosed {
addresses: Vec<Multiaddr>,
reason: Result<(), io::Error>,
},
ListenerError {
error: io::Error,
},
Dialing(PeerId),
}
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo = PeerId>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
network: Network<
BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
TConnInfo,
PeerId,
>,
behaviour: TBehaviour,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: SmallVec<[Multiaddr; 8]>,
external_addrs: Addresses,
banned_peers: HashSet<PeerId>,
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
type Target = TBehaviour;
fn deref(&self) -> &Self::Target {
&self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where
THandler: IntoProtocolsHandler,
TConnInfo: ConnectionInfo<PeerId = PeerId>,
{
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
SwarmBuilder::new(transport, behaviour, local_peer_id)
.build()
}
pub fn network_info(me: &Self) -> NetworkInfo {
me.network.info()
}
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
me.network.listen_on(addr)
}
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
me.network.remove_listener(id)
}
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
}
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
let self_listening = &me.listened_addrs;
let mut addrs = me.behaviour.addresses_of_peer(peer_id)
.into_iter()
.filter(|a| !self_listening.contains(a));
let result =
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
me.network.peer(peer_id.clone())
.dial(first, addrs, handler)
.map(|_| ())
.map_err(DialError::ConnectionLimit)
} else {
Err(DialError::NoAddresses)
};
if let Err(error) = &result {
log::debug!(
"New dialing attempt to peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
}
result
}
pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.network.listen_addrs()
}
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.external_addrs.iter()
}
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.network.local_peer_id()
}
pub fn add_external_address(me: &mut Self, addr: Multiaddr) {
me.external_addrs.add(addr)
}
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<TConnInfo> {
if let Some(mut n) = me.network.peer(peer_id.clone()).into_connected() {
Some(n.some_connection().info().clone())
} else {
None
}
}
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.insert(peer_id.clone());
if let Some(c) = me.network.peer(peer_id).into_connected() {
c.disconnect();
}
}
pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.remove(&peer_id);
}
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
{
let this = &mut *self;
loop {
let mut network_not_ready = false;
match this.network.poll(cx) {
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
let peer = connection.peer_id().clone();
let connection = connection.id();
this.behaviour.inject_event(peer, connection, event);
},
Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
let peer = connection.peer_id();
let connection = connection.id();
this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
},
Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
let peer_id = connection.peer_id().clone();
let endpoint = connection.endpoint().clone();
if this.banned_peers.contains(&peer_id) {
this.network.peer(peer_id.clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.disconnect();
return Poll::Ready(SwarmEvent::BannedPeer {
peer_id,
endpoint,
});
} else {
log::debug!("Connection established: {:?}; Total (peer): {}.",
connection.connected(), num_established);
let endpoint = connection.endpoint().clone();
this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
if num_established.get() == 1 {
this.behaviour.inject_connected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id, num_established, endpoint
});
}
},
Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => {
log::debug!("Connection {:?} closed: {:?}", connected, error);
let info = connected.info;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint);
if num_established == 0 {
this.behaviour.inject_disconnected(info.peer_id());
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id: info.peer_id().clone(),
endpoint,
cause: error,
num_established,
});
},
Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => {
let handler = this.behaviour.new_handler();
let local_addr = incoming.local_addr().clone();
let send_back_addr = incoming.send_back_addr().clone();
if let Err(e) = incoming.accept(handler.into_node_handler_builder()) {
log::warn!("Incoming connection rejected: {:?}", e);
}
return Poll::Ready(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
},
Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
}
this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()),
Err(err) => Err(err),
});
return Poll::Ready(SwarmEvent::ListenerClosed {
addresses,
reason,
});
}
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
this.behaviour.inject_listener_error(listener_id, &error);
return Poll::Ready(SwarmEvent::ListenerError {
error,
});
},
Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
log::debug!("Incoming connection failed: {:?}", error);
return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
});
},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
log::debug!(
"Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
peer_id, multiaddr, error, attempts_remaining);
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if attempts_remaining == 0 {
this.behaviour.inject_dial_failure(&peer_id);
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id,
address: multiaddr,
error,
attempts_remaining,
});
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
multiaddr, error);
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
address: multiaddr,
error,
});
},
}
if let Some((peer_id, handler, event)) = this.pending_event.take() {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
PendingNotifyHandler::One(conn_id) =>
if let Some(mut conn) = peer.connection(conn_id) {
if let Some(event) = notify_one(&mut conn, event, cx) {
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
},
PendingNotifyHandler::Any(ids) => {
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
PendingNotifyHandler::All(ids) => {
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
}
debug_assert!(this.pending_event.is_none());
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
let condition_matched = match condition {
DialPeerCondition::Disconnected
if this.network.is_disconnected(&peer_id) => true,
DialPeerCondition::NotDialing
if !this.network.is_dialing(&peer_id) => true,
_ => false
};
if condition_matched {
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
let self_listening = &this.listened_addrs;
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
let mut attempt = peer.some_attempt();
for a in addrs {
if !self_listening.contains(&a) {
attempt.add_address(a);
}
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
match handler {
NotifyHandler::One(connection) => {
if let Some(mut conn) = peer.connection(connection) {
if let Some(event) = notify_one(&mut conn, event, cx) {
let handler = PendingNotifyHandler::One(connection);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
NotifyHandler::Any => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::Any(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
NotifyHandler::All => {
let ids = peer.connections().into_ids().collect();
if let Some((event, ids)) = notify_all(ids, &mut peer, event, cx) {
let handler = PendingNotifyHandler::All(ids);
this.pending_event = Some((peer_id, handler, event));
return Poll::Pending
}
}
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| *a != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr);
}
},
}
}
}
}
enum PendingNotifyHandler {
One(ConnectionId),
Any(SmallVec<[ConnectionId; 10]>),
All(SmallVec<[ConnectionId; 10]>),
}
fn notify_one<'a, TInEvent, TConnInfo, TPeerId>(
conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>,
event: TInEvent,
cx: &mut Context,
) -> Option<TInEvent>
where
TPeerId: Eq + std::hash::Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Poll::Ready(Err(())) => None,
Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event);
None
}
}
}
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
event: TInEvent,
cx: &mut Context,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
let mut pending = SmallVec::new();
let mut event = Some(event);
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Err(())) => {}
Poll::Ready(Ok(())) => {
let e = event.take().expect("by (1),(2)");
if let Err(e) = conn.notify_handler(e) {
event = Some(e)
} else {
break
}
}
}
}
}
event.and_then(|e|
if !pending.is_empty() {
Some((e, pending))
} else {
None
})
}
fn notify_all<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
event: TInEvent,
cx: &mut Context,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
TInEvent: Clone,
THandler: IntoConnectionHandler<TConnInfo>,
TPeerId: Eq + Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
{
if ids.len() == 1 {
if let Some(mut conn) = peer.connection(ids[0]) {
return notify_one(&mut conn, event, cx).map(|e| (e, ids))
}
}
let mut pending = SmallVec::new();
for id in ids.into_iter() {
if let Some(mut conn) = peer.connection(id) {
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => pending.push(id),
Poll::Ready(Ok(())) => {
let _ = conn.notify_handler(event.clone());
},
Poll::Ready(Err(())) => {}
}
}
}
if !pending.is_empty() {
return Some((event, pending))
}
None
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
type Item = TBehaviour::OutEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(Some(event));
}
}
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Clone + Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
fn is_terminated(&self) -> bool {
false
}
}
pub struct SwarmPollParameters<'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addrs: &'a Addresses,
}
impl<'a> PollParameters for SwarmPollParameters<'a> {
type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
type ExternalAddressesIter = AddressIntoIter;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
self.supported_protocols.to_vec().into_iter()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
self.listened_addrs.to_vec().into_iter()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
self.external_addrs.clone().into_iter()
}
fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
local_peer_id: PeerId,
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
behaviour: TBehaviour,
network_config: NetworkConfig,
}
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
where TBehaviour: NetworkBehaviour,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
pub fn new<TTrans, TMuxer>(transport: TTrans, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTrans::Error: Send + Sync + 'static,
TTrans::Listener: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TTrans::Dial: Send + 'static,
{
let transport = transport
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
SwarmBuilder {
local_peer_id,
transport,
behaviour,
network_config: Default::default(),
}
}
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config.set_executor(e);
self
}
pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.network_config.set_notify_handler_buffer_size(n);
self
}
pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
self.network_config.set_connection_event_buffer_size(n);
self
}
pub fn incoming_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_incoming_limit(n);
self
}
pub fn outgoing_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_outgoing_limit(n);
self
}
pub fn peer_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_established_per_peer_limit(n);
self
}
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let mut network_cfg = self.network_config;
if network_cfg.executor().is_none() {
struct PoolWrapper(ThreadPool);
impl Executor for PoolWrapper {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn_ok(f)
}
}
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
.map(|tp| Box::new(PoolWrapper(tp)) as Box<_>)
{
Ok(executor) => { network_cfg.set_executor(Box::new(executor)); },
Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err)
}
}
let network = Network::new(self.transport, self.local_peer_id, network_cfg);
ExpandedSwarm {
network,
behaviour: self.behaviour,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
pending_event: None
}
}
}
#[derive(Debug)]
pub enum DialError {
ConnectionLimit(ConnectionLimit),
NoAddresses
}
impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer.")
}
}
}
impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::NoAddresses => None
}
}
}
#[derive(Clone, Default)]
pub struct DummyBehaviour {
}
impl NetworkBehaviour for DummyBehaviour {
type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_event(&mut self, _: PeerId, _: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::{DummyBehaviour, SwarmBuilder};
use libp2p_core::{
PeerId,
PublicKey,
identity,
transport::dummy::{DummyStream, DummyTransport}
};
use libp2p_mplex::Multiplex;
fn get_random_id() -> PublicKey {
identity::Keypair::generate_ed25519().public()
}
#[test]
fn test_build_swarm() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let behaviour = DummyBehaviour {};
let swarm = SwarmBuilder::new(transport, behaviour, id.into())
.incoming_connection_limit(4)
.build();
assert_eq!(swarm.network.incoming_limit(), Some(4));
}
#[test]
fn test_build_swarm_with_max_listeners_none() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build();
assert!(swarm.network.incoming_limit().is_none())
}
}