use crate::{
Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
muxing::StreamMuxer,
nodes::{
handled_node::NodeHandler,
node::Substream,
raw_swarm::{RawSwarm, RawSwarmEvent}
},
protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, IntoProtocolsHandler, ProtocolsHandler},
topology::Topology,
transport::TransportError,
topology::DisconnectReason,
};
use futures::prelude::*;
use smallvec::SmallVec;
use std::{fmt, io, ops::{Deref, DerefMut}};
pub use crate::nodes::raw_swarm::ConnectedPoint;
pub struct Swarm<TTransport, TBehaviour, TTopology>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
{
raw_swarm: RawSwarm<
TTransport,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
NodeHandlerWrapperBuilder<TBehaviour::ProtocolsHandler>,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error,
>,
behaviour: TBehaviour,
topology: TTopology,
supported_protocols: SmallVec<[Vec<u8>; 16]>,
listened_addrs: SmallVec<[Multiaddr; 8]>,
}
impl<TTransport, TBehaviour, TTopology> Deref for Swarm<TTransport, TBehaviour, TTopology>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
{
type Target = TBehaviour;
#[inline]
fn deref(&self) -> &Self::Target {
&self.behaviour
}
}
impl<TTransport, TBehaviour, TTopology> DerefMut for Swarm<TTransport, TBehaviour, TTopology>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TTransport, TBehaviour, TMuxer, TTopology> Swarm<TTransport, TBehaviour, TTopology>
where TBehaviour: NetworkBehaviour<TTopology>,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (PeerId, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
TBehaviour::ProtocolsHandler: Send + 'static,
<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<NodeHandlerWrapper<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static,
TTopology: Topology,
{
#[inline]
pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self {
let supported_protocols = behaviour
.new_handler()
.into_handler(topology.local_peer_id())
.listen_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let raw_swarm = RawSwarm::new(transport, topology.local_peer_id().clone());
Swarm {
raw_swarm,
behaviour,
topology,
supported_protocols,
listened_addrs: SmallVec::new(),
}
}
#[inline]
pub fn transport(me: &Self) -> &TTransport {
me.raw_swarm.transport()
}
#[inline]
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTransport::Error>> {
let result = me.raw_swarm.listen_on(addr);
if let Ok(ref addr) = result {
me.listened_addrs.push(addr.clone());
}
result
}
#[inline]
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<TTransport::Error>> {
let handler = me.behaviour.new_handler();
me.raw_swarm.dial(addr, handler.into_node_handler_builder())
}
#[inline]
pub fn dial(me: &mut Self, peer_id: PeerId) {
let addrs = me.topology.addresses_of_peer(&peer_id);
let handler = me.behaviour.new_handler().into_node_handler_builder();
if let Some(peer) = me.raw_swarm.peer(peer_id).as_not_connected() {
let _ = peer.connect_iter(addrs, handler);
}
}
#[inline]
pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
RawSwarm::listeners(&me.raw_swarm)
}
#[inline]
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.raw_swarm.local_peer_id()
}
#[inline]
pub fn topology(me: &Self) -> &TTopology {
&me.topology
}
#[inline]
pub fn topology_mut(me: &mut Self) -> &mut TTopology {
&mut me.topology
}
}
impl<TTransport, TBehaviour, TMuxer, TTopology> Stream for Swarm<TTransport, TBehaviour, TTopology>
where TBehaviour: NetworkBehaviour<TTopology>,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (PeerId, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
TBehaviour::ProtocolsHandler: Send + 'static,
<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<NodeHandlerWrapper<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static,
TTopology: Topology,
{
type Item = TBehaviour::OutEvent;
type Error = io::Error;
#[inline]
fn poll(&mut self) -> Poll<Option<TBehaviour::OutEvent>, io::Error> {
loop {
let mut raw_swarm_not_ready = false;
match self.raw_swarm.poll() {
Async::NotReady => raw_swarm_not_ready = true,
Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => {
self.behaviour.inject_node_event(peer_id, event);
},
Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => {
self.topology.set_connected(&peer_id, &endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint }) => {
self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Graceful);
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::NodeError { peer_id, endpoint, .. }) => {
self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Error);
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => {
self.topology.set_disconnected(&peer_id, &closed_endpoint, DisconnectReason::Replaced);
self.topology.set_connected(&peer_id, &endpoint);
self.behaviour.inject_disconnected(&peer_id, closed_endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
let handler = self.behaviour.new_handler();
incoming.accept(handler.into_node_handler_builder());
},
Async::Ready(RawSwarmEvent::ListenerClosed { .. }) => {},
Async::Ready(RawSwarmEvent::IncomingConnectionError { .. }) => {},
Async::Ready(RawSwarmEvent::DialError { multiaddr, .. }) => {
self.topology.set_unreachable(&multiaddr);
},
Async::Ready(RawSwarmEvent::UnknownPeerDialError { multiaddr, .. }) => {
self.topology.set_unreachable(&multiaddr);
},
}
let behaviour_poll = {
let transport = self.raw_swarm.transport();
let mut parameters = PollParameters {
topology: &mut self.topology,
supported_protocols: &self.supported_protocols,
listened_addrs: &self.listened_addrs,
nat_traversal: &move |a, b| transport.nat_traversal(a, b),
};
self.behaviour.poll(&mut parameters)
};
match behaviour_poll {
Async::NotReady if raw_swarm_not_ready => return Ok(Async::NotReady),
Async::NotReady => (),
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
return Ok(Async::Ready(Some(event)));
},
Async::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = Swarm::dial_addr(self, address);
},
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => {
Swarm::dial(self, peer_id)
},
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => {
if let Some(mut peer) = self.raw_swarm.peer(peer_id).as_connected() {
peer.send_event(event);
}
},
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
self.topology.add_local_external_addrs(self.raw_swarm.nat_traversal(&address));
},
}
}
}
}
pub trait NetworkBehaviour<TTopology> {
type ProtocolsHandler: IntoProtocolsHandler;
type OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler;
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint);
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint);
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
);
fn poll(&mut self, topology: &mut PollParameters<TTopology>) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>;
}
pub trait NetworkBehaviourEventProcess<TEvent> {
fn inject_event(&mut self, event: TEvent);
}
pub struct PollParameters<'a, TTopology: 'a> {
topology: &'a mut TTopology,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
nat_traversal: &'a dyn Fn(&Multiaddr, &Multiaddr) -> Option<Multiaddr>,
}
impl<'a, TTopology> PollParameters<'a, TTopology> {
#[inline]
pub fn topology(&mut self) -> &mut TTopology {
&mut self.topology
}
#[inline]
pub fn supported_protocols(&self) -> impl ExactSizeIterator<Item = &[u8]> {
self.supported_protocols.iter().map(AsRef::as_ref)
}
#[inline]
pub fn listened_addresses(&self) -> impl ExactSizeIterator<Item = &Multiaddr> {
self.listened_addrs.iter()
}
#[inline]
pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator<Item = Multiaddr> + 'b
where TTopology: Topology
{
let local_peer_id = self.topology.local_peer_id().clone();
self.topology.addresses_of_peer(&local_peer_id).into_iter()
}
#[inline]
pub fn local_public_key(&self) -> &PublicKey
where TTopology: Topology
{
self.topology.local_public_key()
}
#[inline]
pub fn local_peer_id(&self) -> &PeerId
where TTopology: Topology
{
self.topology.local_peer_id()
}
#[inline]
pub fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
(self.nat_traversal)(server, observed)
}
}
#[derive(Debug, Clone)]
pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
GenerateEvent(TOutEvent),
DialAddress {
address: Multiaddr,
},
DialPeer {
peer_id: PeerId,
},
SendEvent {
peer_id: PeerId,
event: TInEvent,
},
ReportObservedAddr {
address: Multiaddr,
},
}