mod behaviour;
mod registry;
mod upgrade;
pub mod protocols_handler;
pub mod toggle;
pub use behaviour::{
NetworkBehaviour,
NetworkBehaviourAction,
NetworkBehaviourEventProcess,
PollParameters
};
pub use protocols_handler::{
IntoProtocolsHandler,
IntoProtocolsHandlerSelect,
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerSelect,
ProtocolsHandlerUpgrErr,
OneShotHandler,
SubstreamProtocol
};
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError};
use futures::{prelude::*, executor::{ThreadPool, ThreadPoolBuilder}, stream::FusedStream};
use libp2p_core::{
Executor, Negotiated, Transport, Multiaddr, PeerId, ProtocolName,
muxing::{StreamMuxer, StreamMuxerBox},
nodes::{
ListenerId, Substream,
collection::ConnectionInfo,
network::{self, Network, NetworkEvent}
},
transport::{
boxed::Boxed as BoxTransport,
TransportError,
}
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use upgrade::UpgradeInfoSend as _;
pub type Swarm<TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error,
TConnInfo,
>;
#[derive(Debug)]
pub enum SwarmEvent<TBvEv> {
Behaviour(TBvEv),
Connected(PeerId),
Disconnected(PeerId),
NewListenAddr(Multiaddr),
ExpiredListenAddr(Multiaddr),
UnreachableAddr {
peer_id: Option<PeerId>,
address: Multiaddr,
error: Box<dyn error::Error + Send>,
},
StartConnect(PeerId),
}
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId> {
network: Network<
BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
NodeHandlerWrapperError<THandlerErr>,
TConnInfo,
PeerId,
>,
behaviour: TBehaviour,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: SmallVec<[Multiaddr; 8]>,
external_addrs: Addresses,
banned_peers: HashSet<PeerId>,
send_event_to_complete: Option<(PeerId, TInEvent)>
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
type Target = TBehaviour;
fn deref(&self) -> &Self::Target {
&self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
THandlerErr: error::Error + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr>,
{
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
SwarmBuilder::new(transport, behaviour, local_peer_id)
.build()
}
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
me.network.listen_on(addr)
}
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
me.network.remove_listener(id)
}
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<io::Error>> {
let handler = me.behaviour.new_handler();
me.network.dial(addr, handler.into_node_handler_builder())
}
pub fn dial(me: &mut Self, peer_id: PeerId) {
let addrs = me.behaviour.addresses_of_peer(&peer_id);
match me.network.peer(peer_id.clone()) {
network::Peer::NotConnected(peer) => {
let handler = me.behaviour.new_handler().into_node_handler_builder();
if peer.connect_iter(addrs, handler).is_err() {
me.behaviour.inject_dial_failure(&peer_id);
}
},
network::Peer::PendingConnect(mut peer) => {
peer.append_multiaddr_attempts(addrs)
},
network::Peer::Connected(_) | network::Peer::LocalNode => {}
}
}
pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.network.listen_addrs()
}
pub fn external_addresses(me: &Self) -> impl Iterator<Item = &Multiaddr> {
me.external_addrs.iter()
}
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.network.local_peer_id()
}
pub fn add_external_address(me: &mut Self, addr: Multiaddr) {
me.external_addrs.add(addr)
}
pub fn connection_info(me: &mut Self, peer_id: &PeerId) -> Option<TConnInfo> {
if let Some(mut n) = me.network.peer(peer_id.clone()).into_connected() {
Some(n.connection_info().clone())
} else {
None
}
}
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.insert(peer_id.clone());
if let Some(c) = me.network.peer(peer_id).into_connected() {
c.close();
}
}
pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.remove(&peer_id);
}
pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent> {
future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
}
pub async fn next(&mut self) -> TBehaviour::OutEvent {
future::poll_fn(move |cx| {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(event);
}
}
}).await
}
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<SwarmEvent<TBehaviour::OutEvent>>
{
let this = &mut *self;
loop {
let mut network_not_ready = false;
match this.network.poll(cx) {
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::NodeEvent { conn_info, event }) => {
this.behaviour.inject_node_event(conn_info.peer_id().clone(), event);
},
Poll::Ready(NetworkEvent::Connected { conn_info, endpoint }) => {
if this.banned_peers.contains(conn_info.peer_id()) {
this.network.peer(conn_info.peer_id().clone())
.into_connected()
.expect("the Network just notified us that we were connected; QED")
.close();
} else {
this.behaviour.inject_connected(conn_info.peer_id().clone(), endpoint);
return Poll::Ready(SwarmEvent::Connected(conn_info.peer_id().clone()));
}
},
Poll::Ready(NetworkEvent::NodeClosed { conn_info, endpoint, error }) => {
log::trace!("Connection {:?} with endpoint {:?} closed by {:?}",
conn_info, endpoint, error);
this.behaviour.inject_disconnected(conn_info.peer_id(), endpoint);
return Poll::Ready(SwarmEvent::Disconnected(conn_info.peer_id().clone()));
},
Poll::Ready(NetworkEvent::Replaced { new_info, closed_endpoint, endpoint, .. }) => {
this.behaviour.inject_replaced(new_info.peer_id().clone(), closed_endpoint, endpoint);
},
Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => {
let handler = this.behaviour.new_handler();
incoming.accept(handler.into_node_handler_builder());
},
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listen_addr, .. }) => {
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, .. }) =>
this.behaviour.inject_listener_closed(listener_id),
Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) =>
this.behaviour.inject_listener_error(listener_id, &error),
Poll::Ready(NetworkEvent::IncomingConnectionError { .. }) => {},
Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, new_state }) => {
this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
if let network::PeerState::NotConnected = new_state {
this.behaviour.inject_dial_failure(&peer_id);
}
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id: Some(peer_id.clone()),
address: multiaddr,
error: Box::new(error),
});
},
Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
return Poll::Ready(SwarmEvent::UnreachableAddr {
peer_id: None,
address: multiaddr,
error: Box::new(error),
});
},
}
if let Some((id, pending)) = this.send_event_to_complete.take() {
if let Some(mut peer) = this.network.peer(id.clone()).into_connected() {
match peer.poll_ready_event(cx) {
Poll::Ready(()) => peer.start_send_event(pending),
Poll::Pending => {
this.send_event_to_complete = Some((id, pending));
return Poll::Pending
},
}
}
}
let behaviour_poll = {
let mut parameters = SwarmPollParameters {
local_peer_id: &mut this.network.local_peer_id(),
supported_protocols: &this.supported_protocols,
listened_addrs: &this.listened_addrs,
external_addrs: &this.external_addrs
};
this.behaviour.poll(cx, &mut parameters)
};
match behaviour_poll {
Poll::Pending if network_not_ready => return Poll::Pending,
Poll::Pending => (),
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
ExpandedSwarm::dial(&mut *this, peer_id.clone());
return Poll::Ready(SwarmEvent::StartConnect(peer_id))
}
},
Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => {
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_connected() {
if let Poll::Ready(()) = peer.poll_ready_event(cx) {
peer.start_send_event(event);
} else {
debug_assert!(this.send_event_to_complete.is_none());
this.send_event_to_complete = Some((peer_id, event));
return Poll::Pending;
}
}
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| *a != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr);
}
},
}
}
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandlerErr: error::Error + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
type Item = TBehaviour::OutEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
if let SwarmEvent::Behaviour(event) = event {
return Poll::Ready(Some(event));
}
}
}
}
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandlerErr: error::Error + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
fn is_terminated(&self) -> bool {
false
}
}
pub struct SwarmPollParameters<'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addrs: &'a Addresses,
}
impl<'a> PollParameters for SwarmPollParameters<'a> {
type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
type ExternalAddressesIter = AddressIntoIter;
fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
self.supported_protocols.to_vec().into_iter()
}
fn listened_addresses(&self) -> Self::ListenedAddressesIter {
self.listened_addrs.to_vec().into_iter()
}
fn external_addresses(&self) -> Self::ExternalAddressesIter {
self.external_addrs.clone().into_iter()
}
fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
}
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
incoming_limit: Option<u32>,
executor: Option<Box<dyn Executor + Send>>,
local_peer_id: PeerId,
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
behaviour: TBehaviour,
}
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
where TBehaviour: NetworkBehaviour,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
let transport = transport
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
SwarmBuilder {
incoming_limit: None,
local_peer_id,
executor: None,
transport,
behaviour,
}
}
pub fn incoming_limit(mut self, incoming_limit: Option<u32>) -> Self {
self.incoming_limit = incoming_limit;
self
}
pub fn executor(mut self, executor: impl Executor + Send + 'static) -> Self {
self.executor = Some(Box::new(executor));
self
}
pub fn executor_fn(mut self, executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.executor = Some(Box::new(SpawnImpl(executor)));
self
}
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let executor = self.executor.or_else(|| {
struct PoolWrapper(ThreadPool);
impl Executor for PoolWrapper {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn_ok(f)
}
}
ThreadPoolBuilder::new()
.name_prefix("libp2p-task-")
.create()
.ok()
.map(|tp| Box::new(PoolWrapper(tp)) as Box<_>)
});
let network = Network::new_with_incoming_limit(
self.transport,
self.local_peer_id,
executor,
self.incoming_limit
);
ExpandedSwarm {
network,
behaviour: self.behaviour,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
send_event_to_complete: None
}
}
}
#[derive(Clone, Default)]
pub struct DummyBehaviour {
}
impl NetworkBehaviour for DummyBehaviour {
type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_node_event(&mut self, _: PeerId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::{DummyBehaviour, SwarmBuilder};
use libp2p_core::{
identity,
PeerId,
PublicKey,
transport::dummy::{DummyStream, DummyTransport}
};
use libp2p_mplex::Multiplex;
fn get_random_id() -> PublicKey {
identity::Keypair::generate_ed25519().public()
}
#[test]
fn test_build_swarm() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into())
.incoming_limit(Some(4)).build();
assert_eq!(swarm.network.incoming_limit(), Some(4));
}
#[test]
fn test_build_swarm_with_max_listeners_none() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build();
assert!(swarm.network.incoming_limit().is_none())
}
}