cs_mwc_libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High level manager of the network.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that
35//!     will be used in order to reach nodes on the network based on their
36//!     address. See the `transport` module for more information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
38//!     machine that defines how the swarm should behave once it is connected
39//!     to a node.
40//!
41//! # Network Behaviour
42//!
43//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
44//! the swarm how it should behave. This includes which protocols are supported
45//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
46//! controls what happens on the network. Multiple types that implement
47//! `NetworkBehaviour` can be composed into a single behaviour.
48//!
49//! # Protocols Handler
50//!
51//! The [`ProtocolsHandler`] trait defines how each active connection to a
52//! remote should behave: how to handle incoming substreams, which protocols
53//! are supported, when to open a new outbound substream, etc.
54//!
55
56mod behaviour;
57mod registry;
58#[cfg(test)]
59mod test;
60mod upgrade;
61
62pub mod protocols_handler;
63pub mod toggle;
64
65pub use behaviour::{
66    NetworkBehaviour,
67    NetworkBehaviourAction,
68    NetworkBehaviourEventProcess,
69    PollParameters,
70    NotifyHandler,
71    DialPeerCondition
72};
73pub use protocols_handler::{
74    IntoProtocolsHandler,
75    IntoProtocolsHandlerSelect,
76    KeepAlive,
77    ProtocolsHandler,
78    ProtocolsHandlerEvent,
79    ProtocolsHandlerSelect,
80    ProtocolsHandlerUpgrErr,
81    OneShotHandler,
82    OneShotHandlerConfig,
83    SubstreamProtocol
84};
85pub use registry::{AddressScore, AddressRecord, AddAddressResult};
86
87use protocols_handler::{
88    NodeHandlerWrapperBuilder,
89    NodeHandlerWrapperError,
90};
91use futures::{
92    prelude::*,
93    executor::ThreadPoolBuilder,
94    stream::FusedStream,
95};
96use mwc_libp2p_core::{
97    Executor,
98    Transport,
99    Multiaddr,
100    Negotiated,
101    PeerId,
102    connection::{
103        ConnectionError,
104        ConnectionId,
105        ConnectionLimit,
106        ConnectedPoint,
107        EstablishedConnection,
108        IntoConnectionHandler,
109        ListenerId,
110        PendingConnectionError,
111        Substream
112    },
113    transport::{self, TransportError},
114    muxing::StreamMuxerBox,
115    network::{
116        ConnectionLimits,
117        Network,
118        NetworkInfo,
119        NetworkEvent,
120        NetworkConfig,
121        peer::ConnectedPeer,
122    },
123    upgrade::{ProtocolName},
124};
125use registry::{Addresses, AddressIntoIter};
126use smallvec::SmallVec;
127use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
128use std::collections::HashSet;
129use std::num::{NonZeroU32, NonZeroUsize};
130use upgrade::UpgradeInfoSend as _;
131
132/// Contains the state of the network, plus the way it should behave.
133pub type Swarm<TBehaviour> = ExpandedSwarm<
134    TBehaviour,
135    <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
136    <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
137    <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
138>;
139
140/// Substream for which a protocol has been chosen.
141///
142/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
143/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
144pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
145
146/// Event generated by the `Swarm`.
147#[derive(Debug)]
148pub enum SwarmEvent<TBvEv, THandleErr> {
149    /// Event generated by the `NetworkBehaviour`.
150    Behaviour(TBvEv),
151    /// A connection to the given peer has been opened.
152    ConnectionEstablished {
153        /// Identity of the peer that we have connected to.
154        peer_id: PeerId,
155        /// Endpoint of the connection that has been opened.
156        endpoint: ConnectedPoint,
157        /// Number of established connections to this peer, including the one that has just been
158        /// opened.
159        num_established: NonZeroU32,
160    },
161    /// A connection with the given peer has been closed,
162    /// possibly as a result of an error.
163    ConnectionClosed {
164        /// Identity of the peer that we have connected to.
165        peer_id: PeerId,
166        /// Endpoint of the connection that has been closed.
167        endpoint: ConnectedPoint,
168        /// Number of other remaining connections to this same peer.
169        num_established: u32,
170        /// Reason for the disconnection, if it was not a successful
171        /// active close.
172        cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
173    },
174    /// A new connection arrived on a listener and is in the process of protocol negotiation.
175    ///
176    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished),
177    /// [`BannedPeer`](SwarmEvent::BannedPeer), or
178    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
179    /// generated for this connection.
180    IncomingConnection {
181        /// Local connection address.
182        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
183        /// event.
184        local_addr: Multiaddr,
185        /// Address used to send back data to the remote.
186        send_back_addr: Multiaddr,
187    },
188    /// An error happened on a connection during its initial handshake.
189    ///
190    /// This can include, for example, an error during the handshake of the encryption layer, or
191    /// the connection unexpectedly closed.
192    IncomingConnectionError {
193        /// Local connection address.
194        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
195        /// event.
196        local_addr: Multiaddr,
197        /// Address used to send back data to the remote.
198        send_back_addr: Multiaddr,
199        /// The error that happened.
200        error: PendingConnectionError<io::Error>,
201    },
202    /// We connected to a peer, but we immediately closed the connection because that peer is banned.
203    BannedPeer {
204        /// Identity of the banned peer.
205        peer_id: PeerId,
206        /// Endpoint of the connection that has been closed.
207        endpoint: ConnectedPoint,
208    },
209    /// Tried to dial an address but it ended up being unreachaable.
210    UnreachableAddr {
211        /// `PeerId` that we were trying to reach.
212        peer_id: PeerId,
213        /// Address that we failed to reach.
214        address: Multiaddr,
215        /// Error that has been encountered.
216        error: PendingConnectionError<io::Error>,
217        /// Number of remaining connection attempts that are being tried for this peer.
218        attempts_remaining: u32,
219    },
220    /// Tried to dial an address but it ended up being unreachaable.
221    /// Contrary to `UnreachableAddr`, we don't know the identity of the peer that we were trying
222    /// to reach.
223    UnknownPeerUnreachableAddr {
224        /// Address that we failed to reach.
225        address: Multiaddr,
226        /// Error that has been encountered.
227        error: PendingConnectionError<io::Error>,
228    },
229    /// One of our listeners has reported a new local listening address.
230    NewListenAddr(Multiaddr),
231    /// One of our listeners has reported the expiration of a listening address.
232    ExpiredListenAddr(Multiaddr),
233    /// One of the listeners gracefully closed.
234    ListenerClosed {
235        /// The addresses that the listener was listening on. These addresses are now considered
236        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
237        /// has been generated for each of them.
238        addresses: Vec<Multiaddr>,
239        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
240        /// if the stream produced an error.
241        reason: Result<(), io::Error>,
242    },
243    /// One of the listeners reported a non-fatal error.
244    ListenerError {
245        /// The listener error.
246        error: io::Error,
247    },
248    /// A new dialing attempt has been initiated.
249    ///
250    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished)
251    /// event is reported if the dialing attempt succeeds, otherwise a
252    /// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
253    /// with `attempts_remaining` equal to 0.
254    Dialing(PeerId),
255}
256
257/// Contains the state of the network, plus the way it should behave.
258pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
259where
260    THandler: IntoProtocolsHandler,
261{
262    network: Network<
263        transport::Boxed<(PeerId, StreamMuxerBox)>,
264        TInEvent,
265        TOutEvent,
266        NodeHandlerWrapperBuilder<THandler>,
267    >,
268
269    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
270    /// handlers.
271    behaviour: TBehaviour,
272
273    /// List of protocols that the behaviour says it supports.
274    supported_protocols: SmallVec<[Vec<u8>; 16]>,
275
276    /// List of multiaddresses we're listening on.
277    listened_addrs: SmallVec<[Multiaddr; 8]>,
278
279    /// List of multiaddresses we're listening on, after account for external IP addresses and
280    /// similar mechanisms.
281    external_addrs: Addresses,
282
283    /// List of nodes for which we deny any incoming connection.
284    banned_peers: HashSet<PeerId>,
285
286    /// Pending event to be delivered to connection handlers
287    /// (or dropped if the peer disconnected) before the `behaviour`
288    /// can be polled again.
289    pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
290
291    /// The configured override for substream protocol upgrades, if any.
292    substream_upgrade_protocol_override: Option<mwc_libp2p_core::upgrade::Version>,
293}
294
295impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
296    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
297where
298    THandler: IntoProtocolsHandler,
299{
300    type Target = TBehaviour;
301
302    fn deref(&self) -> &Self::Target {
303        &self.behaviour
304    }
305}
306
307impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
308    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
309where
310    THandler: IntoProtocolsHandler,
311{
312    fn deref_mut(&mut self) -> &mut Self::Target {
313        &mut self.behaviour
314    }
315}
316
317impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
318    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
319where
320    THandler: IntoProtocolsHandler,
321{
322}
323
324impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
325    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
326where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
327      TInEvent: Send + 'static,
328      TOutEvent: Send + 'static,
329      THandler: IntoProtocolsHandler + Send + 'static,
330      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
331      THandleErr: error::Error + Send + 'static,
332{
333    /// Builds a new `Swarm`.
334    pub fn new(
335        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
336        behaviour: TBehaviour,
337        local_peer_id: PeerId
338    ) -> Self {
339        SwarmBuilder::new(transport, behaviour, local_peer_id).build()
340    }
341
342    /// Returns information about the [`Network`] underlying the `Swarm`.
343    pub fn network_info(me: &Self) -> NetworkInfo {
344        me.network.info()
345    }
346
347    /// Starts listening on the given address.
348    ///
349    /// Returns an error if the address is not supported.
350    pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
351        me.network.listen_on(addr)
352    }
353
354    /// Remove some listener.
355    ///
356    /// Returns `Ok(())` if there was a listener with this ID.
357    pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
358        me.network.remove_listener(id)
359    }
360
361    /// Initiates a new dialing attempt to the given address.
362    pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
363        let handler = me.behaviour.new_handler()
364            .into_node_handler_builder()
365            .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
366        me.network.dial(&addr, handler).map(|_id| ())
367    }
368
369    /// Initiates a new dialing attempt to the given peer.
370    pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
371        if me.banned_peers.contains(peer_id) {
372            me.behaviour.inject_dial_failure(peer_id);
373            return Err(DialError::Banned)
374        }
375
376        let self_listening = &me.listened_addrs;
377        let mut addrs = me.behaviour.addresses_of_peer(peer_id)
378            .into_iter()
379            .filter(|a| !self_listening.contains(a));
380
381        let result =
382            if let Some(first) = addrs.next() {
383                let handler = me.behaviour.new_handler()
384                    .into_node_handler_builder()
385                    .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
386                me.network.peer(*peer_id)
387                    .dial(first, addrs, handler)
388                    .map(|_| ())
389                    .map_err(DialError::ConnectionLimit)
390            } else {
391                Err(DialError::NoAddresses)
392            };
393
394        if let Err(error) = &result {
395            log::debug!(
396                "New dialing attempt to peer {:?} failed: {:?}.",
397                peer_id, error);
398            me.behaviour.inject_dial_failure(&peer_id);
399        }
400
401        result
402    }
403
404    /// Returns an iterator that produces the list of addresses we're listening on.
405    pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
406        me.network.listen_addrs()
407    }
408
409    /// Returns the peer ID of the swarm passed as parameter.
410    pub fn local_peer_id(me: &Self) -> &PeerId {
411        me.network.local_peer_id()
412    }
413
414    /// Returns an iterator for [`AddressRecord`]s of external addresses
415    /// of the local node, in decreasing order of their current
416    /// [score](AddressScore).
417    pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
418        me.external_addrs.iter()
419    }
420
421    /// Adds an external address record for the local node.
422    ///
423    /// An external address is an address of the local node known to
424    /// be (likely) reachable for other nodes, possibly taking into
425    /// account NAT. The external addresses of the local node may be
426    /// shared with other nodes by the `NetworkBehaviour`.
427    ///
428    /// The associated score determines both the position of the address
429    /// in the list of external addresses (which can determine the
430    /// order in which addresses are used to connect to) as well as
431    /// how long the address is retained in the list, depending on
432    /// how frequently it is reported by the `NetworkBehaviour` via
433    /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
434    /// through this method.
435    pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
436        me.external_addrs.add(a, s)
437    }
438
439    /// Removes an external address of the local node, regardless of
440    /// its current score. See [`ExpandedSwarm::add_external_address`]
441    /// for details.
442    ///
443    /// Returns `true` if the address existed and was removed, `false`
444    /// otherwise.
445    pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
446        me.external_addrs.remove(addr)
447    }
448
449    /// Bans a peer by its peer ID.
450    ///
451    /// Any incoming connection and any dialing attempt will immediately be rejected.
452    /// This function has no effect if the peer is already banned.
453    pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
454        if me.banned_peers.insert(peer_id) {
455            if let Some(peer) = me.network.peer(peer_id).into_connected() {
456                peer.disconnect();
457            }
458        }
459    }
460
461    /// Unbans a peer.
462    pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
463        me.banned_peers.remove(&peer_id);
464    }
465
466    /// Checks whether the [`Network`] has an established connection to a peer.
467    pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool {
468        me.network.is_connected(peer_id)
469    }
470
471    /// Checks whether the [`Network`] dealing to the peer.
472    pub fn is_dialing(me: &Self, peer_id: &PeerId) -> bool {
473        me.network.is_dialing(peer_id)
474    }
475
476    /// Returns the next event that happens in the `Swarm`.
477    ///
478    /// Includes events from the `NetworkBehaviour` but also events about the connections status.
479    pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
480        future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
481    }
482
483    /// Returns the next event produced by the [`NetworkBehaviour`].
484    pub async fn next(&mut self) -> TBehaviour::OutEvent {
485        future::poll_fn(move |cx| {
486            loop {
487                let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
488                if let SwarmEvent::Behaviour(event) = event {
489                    return Poll::Ready(event);
490                }
491            }
492        }).await
493    }
494
495    /// Internal function used by everything event-related.
496    ///
497    /// Polls the `Swarm` for the next event.
498    fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
499        -> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
500    {
501        // We use a `this` variable because the compiler can't mutably borrow multiple times
502        // across a `Deref`.
503        let this = &mut *self;
504
505        loop {
506            let mut network_not_ready = false;
507
508            // First let the network make progress.
509            match this.network.poll(cx) {
510                Poll::Pending => network_not_ready = true,
511                Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
512                    let peer = connection.peer_id();
513                    let connection = connection.id();
514                    this.behaviour.inject_event(peer, connection, event);
515                },
516                Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
517                    let peer = connection.peer_id();
518                    let connection = connection.id();
519                    this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
520                },
521                Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
522                    let peer_id = connection.peer_id();
523                    let endpoint = connection.endpoint().clone();
524                    if this.banned_peers.contains(&peer_id) {
525                        this.network.peer(peer_id)
526                            .into_connected()
527                            .expect("the Network just notified us that we were connected; QED")
528                            .disconnect();
529                        return Poll::Ready(SwarmEvent::BannedPeer {
530                            peer_id,
531                            endpoint,
532                        });
533                    } else {
534                        log::debug!("Connection established: {:?}; Total (peer): {}.",
535                            connection.connected(), num_established);
536                        let endpoint = connection.endpoint().clone();
537                        this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
538                        if num_established.get() == 1 {
539                            this.behaviour.inject_connected(&peer_id);
540                        }
541                        return Poll::Ready(SwarmEvent::ConnectionEstablished {
542                            peer_id, num_established, endpoint
543                        });
544                    }
545                },
546                Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
547                    if let Some(error) = error.as_ref() {
548                        log::debug!("Connection {:?} closed: {:?}", connected, error);
549                    } else {
550                        log::debug!("Connection {:?} closed (active close).", connected);
551                    }
552                    let peer_id = connected.peer_id;
553                    let endpoint = connected.endpoint;
554                    this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
555                    if num_established == 0 {
556                        this.behaviour.inject_disconnected(&peer_id);
557                    }
558                    return Poll::Ready(SwarmEvent::ConnectionClosed {
559                        peer_id,
560                        endpoint,
561                        cause: error,
562                        num_established,
563                    });
564                },
565                Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
566                    let handler = this.behaviour.new_handler()
567                        .into_node_handler_builder()
568                        .with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
569                    let local_addr = connection.local_addr.clone();
570                    let send_back_addr = connection.send_back_addr.clone();
571                    if let Err(e) = this.network.accept(connection, handler) {
572                        log::warn!("Incoming connection rejected: {:?}", e);
573                    }
574                    return Poll::Ready(SwarmEvent::IncomingConnection {
575                        local_addr,
576                        send_back_addr,
577                    });
578                },
579                Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
580                    log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
581                    if !this.listened_addrs.contains(&listen_addr) {
582                        this.listened_addrs.push(listen_addr.clone())
583                    }
584                    this.behaviour.inject_new_listen_addr(&listen_addr);
585                    return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
586                }
587                Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
588                    log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
589                    this.listened_addrs.retain(|a| a != &listen_addr);
590                    this.behaviour.inject_expired_listen_addr(&listen_addr);
591                    return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
592                }
593                Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
594                    log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
595                    for addr in addresses.iter() {
596                        this.behaviour.inject_expired_listen_addr(addr);
597                    }
598                    this.behaviour.inject_listener_closed(listener_id, match &reason {
599                        Ok(()) => Ok(()),
600                        Err(err) => Err(err),
601                    });
602                    return Poll::Ready(SwarmEvent::ListenerClosed {
603                        addresses,
604                        reason,
605                    });
606                }
607                Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
608                    this.behaviour.inject_listener_error(listener_id, &error);
609                    return Poll::Ready(SwarmEvent::ListenerError {
610                        error,
611                    });
612                },
613                Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
614                    log::debug!("Incoming connection failed: {:?}", error);
615                    return Poll::Ready(SwarmEvent::IncomingConnectionError {
616                        local_addr,
617                        send_back_addr,
618                        error,
619                    });
620                },
621                Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
622                    log::debug!(
623                        "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
624                        peer_id, multiaddr, error, attempts_remaining);
625                    this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
626                    if attempts_remaining == 0 {
627                        this.behaviour.inject_dial_failure(&peer_id);
628                    }
629                    return Poll::Ready(SwarmEvent::UnreachableAddr {
630                        peer_id,
631                        address: multiaddr,
632                        error,
633                        attempts_remaining,
634                    });
635                },
636                Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
637                    log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
638                        multiaddr, error);
639                    this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
640                    return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
641                        address: multiaddr,
642                        error,
643                    });
644                },
645            }
646
647            // After the network had a chance to make progress, try to deliver
648            // the pending event emitted by the behaviour in the previous iteration
649            // to the connection handler(s). The pending event must be delivered
650            // before polling the behaviour again. If the targeted peer
651            // meanwhie disconnected, the event is discarded.
652            if let Some((peer_id, handler, event)) = this.pending_event.take() {
653                if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
654                    match handler {
655                        PendingNotifyHandler::One(conn_id) =>
656                            if let Some(mut conn) = peer.connection(conn_id) {
657                                if let Some(event) = notify_one(&mut conn, event, cx) {
658                                    this.pending_event = Some((peer_id, handler, event));
659                                    return Poll::Pending
660                                }
661                            },
662                        PendingNotifyHandler::Any(ids) => {
663                            if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
664                                let handler = PendingNotifyHandler::Any(ids);
665                                this.pending_event = Some((peer_id, handler, event));
666                                return Poll::Pending
667                            }
668                        }
669                    }
670                }
671            }
672
673            debug_assert!(this.pending_event.is_none());
674
675            let behaviour_poll = {
676                let mut parameters = SwarmPollParameters {
677                    local_peer_id: &mut this.network.local_peer_id(),
678                    supported_protocols: &this.supported_protocols,
679                    listened_addrs: &this.listened_addrs,
680                    external_addrs: &this.external_addrs
681                };
682                this.behaviour.poll(cx, &mut parameters)
683            };
684
685            match behaviour_poll {
686                Poll::Pending if network_not_ready => return Poll::Pending,
687                Poll::Pending => (),
688                Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
689                    return Poll::Ready(SwarmEvent::Behaviour(event))
690                },
691                Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
692                    let _ = ExpandedSwarm::dial_addr(&mut *this, address);
693                },
694                Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
695                    if this.banned_peers.contains(&peer_id) {
696                        this.behaviour.inject_dial_failure(&peer_id);
697                    } else {
698                        let condition_matched = match condition {
699                            DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
700                            DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
701                            DialPeerCondition::Always => true,
702                        };
703                        if condition_matched {
704                            if ExpandedSwarm::dial(this, &peer_id).is_ok() {
705                                return Poll::Ready(SwarmEvent::Dialing(peer_id))
706                            }
707                        } else {
708                            // Even if the condition for a _new_ dialing attempt is not met,
709                            // we always add any potentially new addresses of the peer to an
710                            // ongoing dialing attempt, if there is one.
711                            log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
712                                peer_id, condition);
713                            let self_listening = &this.listened_addrs;
714                            if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
715                                let addrs = this.behaviour.addresses_of_peer(peer.id());
716                                let mut attempt = peer.some_attempt();
717                                for a in addrs {
718                                    if !self_listening.contains(&a) {
719                                        attempt.add_address(a);
720                                    }
721                                }
722                            }
723                        }
724                    }
725                },
726                Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
727                    if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
728                        match handler {
729                            NotifyHandler::One(connection) => {
730                                if let Some(mut conn) = peer.connection(connection) {
731                                    if let Some(event) = notify_one(&mut conn, event, cx) {
732                                        let handler = PendingNotifyHandler::One(connection);
733                                        this.pending_event = Some((peer_id, handler, event));
734                                        return Poll::Pending
735                                    }
736                                }
737                            }
738                            NotifyHandler::Any => {
739                                let ids = peer.connections().into_ids().collect();
740                                if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
741                                    let handler = PendingNotifyHandler::Any(ids);
742                                    this.pending_event = Some((peer_id, handler, event));
743                                    return Poll::Pending
744                                }
745                            }
746                        }
747                    }
748                },
749                Poll::Ready(NetworkBehaviourAction::DisconnectPeer { peer_id }) => {
750                    this.disconnect_peer(&peer_id);
751                },
752                Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
753                    for addr in this.network.address_translation(&address) {
754                        if this.external_addrs.iter().all(|a| a.addr != addr) {
755                            this.behaviour.inject_new_external_addr(&addr);
756                        }
757                        this.external_addrs.add(addr, score);
758                    }
759                },
760            }
761        }
762    }
763
764    pub fn disconnect_peer( &mut self, peer_id: &PeerId ) {
765        if let Some(mut peer) = self.network.peer(peer_id.clone()).into_connected() {
766            let mut con_iter = peer.connections();
767            loop {
768                if let Some(con) = con_iter.next() {
769                    con.start_close();
770                }
771                else {
772                    break;
773                }
774            }
775        }
776    }
777
778    pub fn get_behaviour(&mut self) -> &mut TBehaviour {
779        &mut self.behaviour
780    }
781
782}
783
784/// Connection to notify of a pending event.
785///
786/// The connection IDs out of which to notify one of an event are captured at
787/// the time the behaviour emits the event, in order not to forward the event to
788/// a new connection which the behaviour may not have been aware of at the time
789/// it issued the request for sending it.
790enum PendingNotifyHandler {
791    One(ConnectionId),
792    Any(SmallVec<[ConnectionId; 10]>),
793}
794
795/// Notify a single connection of an event.
796///
797/// Returns `Some` with the given event if the connection is not currently
798/// ready to receive another event, in which case the current task is
799/// scheduled to be woken up.
800///
801/// Returns `None` if the connection is closing or the event has been
802/// successfully sent, in either case the event is consumed.
803fn notify_one<'a, TInEvent>(
804    conn: &mut EstablishedConnection<'a, TInEvent>,
805    event: TInEvent,
806    cx: &mut Context<'_>,
807) -> Option<TInEvent>
808{
809    match conn.poll_ready_notify_handler(cx) {
810        Poll::Pending => Some(event),
811        Poll::Ready(Err(())) => None, // connection is closing
812        Poll::Ready(Ok(())) => {
813            // Can now only fail if connection is closing.
814            let _ = conn.notify_handler(event);
815            None
816        }
817    }
818}
819
820/// Notify any one of a given list of connections of a peer of an event.
821///
822/// Returns `Some` with the given event and a new list of connections if
823/// none of the given connections was able to receive the event but at
824/// least one of them is not closing, in which case the current task
825/// is scheduled to be woken up. The returned connections are those which
826/// may still become ready to receive another event.
827///
828/// Returns `None` if either all connections are closing or the event
829/// was successfully sent to a handler, in either case the event is consumed.
830fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
831    ids: SmallVec<[ConnectionId; 10]>,
832    peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
833    event: TInEvent,
834    cx: &mut Context<'_>,
835) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
836where
837    TTrans: Transport,
838    THandler: IntoConnectionHandler,
839{
840    let mut pending = SmallVec::new();
841    let mut event = Some(event); // (1)
842    for id in ids.into_iter() {
843        if let Some(mut conn) = peer.connection(id) {
844            match conn.poll_ready_notify_handler(cx) {
845                Poll::Pending => pending.push(id),
846                Poll::Ready(Err(())) => {} // connection is closing
847                Poll::Ready(Ok(())) => {
848                    let e = event.take().expect("by (1),(2)");
849                    if let Err(e) = conn.notify_handler(e) {
850                        event = Some(e) // (2)
851                    } else {
852                        break
853                    }
854                }
855            }
856        }
857    }
858
859    event.and_then(|e|
860        if !pending.is_empty() {
861            Some((e, pending))
862        } else {
863            None
864        })
865}
866
867impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
868    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
869where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
870      THandler: IntoProtocolsHandler + Send + 'static,
871      TInEvent: Send + 'static,
872      TOutEvent: Send + 'static,
873      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
874{
875    type Item = TBehaviour::OutEvent;
876
877    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
878        loop {
879            let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
880            if let SwarmEvent::Behaviour(event) = event {
881                return Poll::Ready(Some(event));
882            }
883        }
884    }
885}
886
887/// the stream of behaviour events never terminates, so we can implement fused for it
888impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
889    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
890where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
891      THandler: IntoProtocolsHandler + Send + 'static,
892      TInEvent: Send + 'static,
893      TOutEvent: Send + 'static,
894      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
895{
896    fn is_terminated(&self) -> bool {
897        false
898    }
899}
900
901/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
902// TODO: #[derive(Debug)]
903pub struct SwarmPollParameters<'a> {
904    local_peer_id: &'a PeerId,
905    supported_protocols: &'a [Vec<u8>],
906    listened_addrs: &'a [Multiaddr],
907    external_addrs: &'a Addresses,
908}
909
910impl<'a> PollParameters for SwarmPollParameters<'a> {
911    type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
912    type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
913    type ExternalAddressesIter = AddressIntoIter;
914
915    fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
916        self.supported_protocols.to_vec().into_iter()
917    }
918
919    fn listened_addresses(&self) -> Self::ListenedAddressesIter {
920        self.listened_addrs.to_vec().into_iter()
921    }
922
923    fn external_addresses(&self) -> Self::ExternalAddressesIter {
924        self.external_addrs.clone().into_iter()
925    }
926
927    fn local_peer_id(&self) -> &PeerId {
928        &self.local_peer_id
929    }
930}
931
932/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`,
933/// including the underlying [`Network`].
934pub struct SwarmBuilder<TBehaviour> {
935    local_peer_id: PeerId,
936    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
937    behaviour: TBehaviour,
938    network_config: NetworkConfig,
939    substream_upgrade_protocol_override: Option<mwc_libp2p_core::upgrade::Version>,
940}
941
942impl<TBehaviour> SwarmBuilder<TBehaviour>
943where TBehaviour: NetworkBehaviour,
944{
945    /// Creates a new `SwarmBuilder` from the given transport, behaviour and
946    /// local peer ID. The `Swarm` with its underlying `Network` is obtained
947    /// via [`SwarmBuilder::build`].
948    pub fn new(
949        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
950        behaviour: TBehaviour,
951        local_peer_id: PeerId
952    ) -> Self {
953        SwarmBuilder {
954            local_peer_id,
955            transport,
956            behaviour,
957            network_config: Default::default(),
958            substream_upgrade_protocol_override: None,
959        }
960    }
961
962    /// Configures the `Executor` to use for spawning background tasks.
963    ///
964    /// By default, unless another executor has been configured,
965    /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
966    pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
967        self.network_config = self.network_config.with_executor(e);
968        self
969    }
970
971    /// Configures the number of events from the [`NetworkBehaviour`] in
972    /// destination to the [`ProtocolsHandler`] that can be buffered before
973    /// the [`Swarm`] has to wait. An individual buffer with this number of
974    /// events exists for each individual connection.
975    ///
976    /// The ideal value depends on the executor used, the CPU speed, and the
977    /// volume of events. If this value is too low, then the [`Swarm`] will
978    /// be sleeping more often than necessary. Increasing this value increases
979    /// the overall memory usage.
980    pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
981        self.network_config = self.network_config.with_notify_handler_buffer_size(n);
982        self
983    }
984
985    /// Configures the number of extra events from the [`ProtocolsHandler`] in
986    /// destination to the [`NetworkBehaviour`] that can be buffered before
987    /// the [`ProtocolsHandler`] has to go to sleep.
988    ///
989    /// There exists a buffer of events received from [`ProtocolsHandler`]s
990    /// that the [`NetworkBehaviour`] has yet to process. This buffer is
991    /// shared between all instances of [`ProtocolsHandler`]. Each instance of
992    /// [`ProtocolsHandler`] is guaranteed one slot in this buffer, meaning
993    /// that delivering an event for the first time is guaranteed to be
994    /// instantaneous. Any extra event delivery, however, must wait for that
995    /// first event to be delivered or for an "extra slot" to be available.
996    ///
997    /// This option configures the number of such "extra slots" in this
998    /// shared buffer. These extra slots are assigned in a first-come,
999    /// first-served basis.
1000    ///
1001    /// The ideal value depends on the executor used, the CPU speed, the
1002    /// average number of connections, and the volume of events. If this value
1003    /// is too low, then the [`ProtocolsHandler`]s will be sleeping more often
1004    /// than necessary. Increasing this value increases the overall memory
1005    /// usage, and more importantly the latency between the moment when an
1006    /// event is emitted and the moment when it is received by the
1007    /// [`NetworkBehaviour`].
1008    pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
1009        self.network_config = self.network_config.with_connection_event_buffer_size(n);
1010        self
1011    }
1012
1013    /// Configures the connection limits.
1014    pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
1015        self.network_config = self.network_config.with_connection_limits(limits);
1016        self
1017    }
1018
1019    /// Configures an override for the substream upgrade protocol to use.
1020    ///
1021    /// The subtream upgrade protocol is the multistream-select protocol
1022    /// used for protocol negotiation on substreams. Since a listener
1023    /// supports all existing versions, the choice of upgrade protocol
1024    /// only effects the "dialer", i.e. the peer opening a substream.
1025    ///
1026    /// > **Note**: If configured, specific upgrade protocols for
1027    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1028    /// > are ignored.
1029    pub fn substream_upgrade_protocol_override(mut self, v: mwc_libp2p_core::upgrade::Version) -> Self {
1030        self.substream_upgrade_protocol_override = Some(v);
1031        self
1032    }
1033
1034    /// Builds a `Swarm` with the current configuration.
1035    pub fn build(mut self) -> Swarm<TBehaviour> {
1036        let supported_protocols = self.behaviour
1037            .new_handler()
1038            .inbound_protocol()
1039            .protocol_info()
1040            .into_iter()
1041            .map(|info| info.protocol_name().to_vec())
1042            .collect();
1043
1044        // If no executor has been explicitly configured, try to set up a thread pool.
1045        let network_cfg = self.network_config.or_else_with_executor(|| {
1046            match ThreadPoolBuilder::new()
1047                .name_prefix("mwc-libp2p-swarm-task-")
1048                .create()
1049            {
1050                Ok(tp) => {
1051                    Some(Box::new(move |f| tp.spawn_ok(f)))
1052                },
1053                Err(err) => {
1054                    log::warn!("Failed to create executor thread pool: {:?}", err);
1055                    None
1056                }
1057            }
1058        });
1059
1060        let network = Network::new(self.transport, self.local_peer_id, network_cfg);
1061
1062        ExpandedSwarm {
1063            network,
1064            behaviour: self.behaviour,
1065            supported_protocols,
1066            listened_addrs: SmallVec::new(),
1067            external_addrs: Addresses::default(),
1068            banned_peers: HashSet::new(),
1069            pending_event: None,
1070            substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
1071        }
1072    }
1073}
1074
1075/// The possible failures of [`ExpandedSwarm::dial`].
1076#[derive(Debug)]
1077pub enum DialError {
1078    /// The peer is currently banned.
1079    Banned,
1080    /// The configured limit for simultaneous outgoing connections
1081    /// has been reached.
1082    ConnectionLimit(ConnectionLimit),
1083    /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
1084    /// for the peer to dial.
1085    NoAddresses
1086}
1087
1088impl fmt::Display for DialError {
1089    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090        match self {
1091            DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
1092            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1093            DialError::Banned => write!(f, "Dial error: peer is banned.")
1094        }
1095    }
1096}
1097
1098impl error::Error for DialError {
1099    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1100        match self {
1101            DialError::ConnectionLimit(err) => Some(err),
1102            DialError::NoAddresses => None,
1103            DialError::Banned => None
1104        }
1105    }
1106}
1107
1108/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
1109#[derive(Clone, Default)]
1110pub struct DummyBehaviour {
1111}
1112
1113impl NetworkBehaviour for DummyBehaviour {
1114    type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
1115    type OutEvent = void::Void;
1116
1117    fn new_handler(&mut self) -> Self::ProtocolsHandler {
1118        protocols_handler::DummyProtocolsHandler::default()
1119    }
1120
1121    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
1122        Vec::new()
1123    }
1124
1125    fn inject_connected(&mut self, _: &PeerId) {}
1126
1127    fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1128
1129    fn inject_disconnected(&mut self, _: &PeerId) {}
1130
1131    fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1132
1133    fn inject_event(&mut self, _: PeerId, _: ConnectionId,
1134        _: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
1135
1136    fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
1137        Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
1138        ProtocolsHandler>::InEvent, Self::OutEvent>>
1139    {
1140        Poll::Pending
1141    }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use crate::protocols_handler::DummyProtocolsHandler;
1147    use crate::test::{MockBehaviour, CallTraceBehaviour};
1148    use futures::{future, executor};
1149    use mwc_libp2p_core::{
1150        identity,
1151        upgrade,
1152        multiaddr,
1153        transport
1154    };
1155    use mwc_libp2p_noise as noise;
1156    use super::*;
1157
1158    fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
1159    where
1160        T: ProtocolsHandler + Clone,
1161        T::OutEvent: Clone,
1162        O: Send + 'static
1163    {
1164        let id_keys = identity::Keypair::generate_ed25519();
1165        let pubkey = id_keys.public();
1166        let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
1167        let transport = transport::MemoryTransport::default()
1168            .upgrade(upgrade::Version::V1)
1169            .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
1170            .multiplex(mwc_libp2p_mplex::MplexConfig::new())
1171            .boxed();
1172        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
1173        SwarmBuilder::new(transport, behaviour, PeerId::from_public_key(pubkey)).build()
1174    }
1175
1176    /// Establishes a number of connections between two peers,
1177    /// after which one peer bans the other.
1178    ///
1179    /// The test expects both behaviours to be notified via pairs of
1180    /// inject_connected / inject_disconnected as well as
1181    /// inject_connection_established / inject_connection_closed calls.
1182    #[test]
1183    fn test_connect_disconnect_ban() {
1184        // Since the test does not try to open any substreams, we can
1185        // use the dummy protocols handler.
1186        let mut handler_proto = DummyProtocolsHandler::default();
1187        handler_proto.keep_alive = KeepAlive::Yes;
1188
1189        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
1190        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
1191
1192        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1193        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1194
1195        Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
1196        Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
1197
1198        // Test execution state. Connection => Disconnecting => Connecting.
1199        enum State {
1200            Connecting,
1201            Disconnecting,
1202        }
1203
1204        let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
1205
1206        let mut banned = false;
1207        let mut unbanned = false;
1208
1209        let num_connections = 10;
1210
1211        for _ in 0 .. num_connections {
1212            Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
1213        }
1214        let mut state = State::Connecting;
1215
1216        executor::block_on(future::poll_fn(move |cx| {
1217            loop {
1218                let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1219                let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1220                match state {
1221                    State::Connecting => {
1222                        for s in &[&swarm1, &swarm2] {
1223                            if s.behaviour.inject_connection_established.len() > 0 {
1224                                assert_eq!(s.behaviour.inject_connected.len(), 1);
1225                            } else {
1226                                assert_eq!(s.behaviour.inject_connected.len(), 0);
1227                            }
1228                            assert!(s.behaviour.inject_connection_closed.len() == 0);
1229                            assert!(s.behaviour.inject_disconnected.len() == 0);
1230                        }
1231                        if [&swarm1, &swarm2].iter().all(|s| {
1232                            s.behaviour.inject_connection_established.len() == num_connections
1233                        }) {
1234                            if banned {
1235                                return Poll::Ready(())
1236                            }
1237                            Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
1238                            swarm1.behaviour.reset();
1239                            swarm2.behaviour.reset();
1240                            banned = true;
1241                            state = State::Disconnecting;
1242                        }
1243                    }
1244                    State::Disconnecting => {
1245                        for s in &[&swarm1, &swarm2] {
1246                            if s.behaviour.inject_connection_closed.len() < num_connections {
1247                                assert_eq!(s.behaviour.inject_disconnected.len(), 0);
1248                            } else {
1249                                assert_eq!(s.behaviour.inject_disconnected.len(), 1);
1250                            }
1251                            assert_eq!(s.behaviour.inject_connection_established.len(), 0);
1252                            assert_eq!(s.behaviour.inject_connected.len(), 0);
1253                        }
1254                        if [&swarm1, &swarm2].iter().all(|s| {
1255                            s.behaviour.inject_connection_closed.len() == num_connections
1256                        }) {
1257                            if unbanned {
1258                                return Poll::Ready(())
1259                            }
1260                            // Unban the first peer and reconnect.
1261                            Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
1262                            swarm1.behaviour.reset();
1263                            swarm2.behaviour.reset();
1264                            unbanned = true;
1265                            for _ in 0 .. num_connections {
1266                                Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
1267                            }
1268                            state = State::Connecting;
1269                        }
1270                    }
1271                }
1272
1273                if poll1.is_pending() && poll2.is_pending() {
1274                    return Poll::Pending
1275                }
1276            }
1277        }))
1278    }
1279}