libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High level manager of the network.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that
35//!     will be used in order to reach nodes on the network based on their
36//!     address. See the `transport` module for more information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
38//!     machine that defines how the swarm should behave once it is connected
39//!     to a node.
40//!
41//! # Network Behaviour
42//!
43//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
44//! the swarm how it should behave. This includes which protocols are supported
45//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
46//! controls what happens on the network. Multiple types that implement
47//! `NetworkBehaviour` can be composed into a single behaviour.
48//!
49//! # Protocols Handler
50//!
51//! The [`ProtocolsHandler`] trait defines how each active connection to a
52//! remote should behave: how to handle incoming substreams, which protocols
53//! are supported, when to open a new outbound substream, etc.
54//!
55
56mod behaviour;
57mod registry;
58#[cfg(test)]
59mod test;
60mod upgrade;
61
62pub mod protocols_handler;
63pub mod toggle;
64
65pub use behaviour::{
66    NetworkBehaviour,
67    NetworkBehaviourAction,
68    NetworkBehaviourEventProcess,
69    PollParameters,
70    NotifyHandler,
71    DialPeerCondition
72};
73pub use protocols_handler::{
74    IntoProtocolsHandler,
75    IntoProtocolsHandlerSelect,
76    KeepAlive,
77    ProtocolsHandler,
78    ProtocolsHandlerEvent,
79    ProtocolsHandlerSelect,
80    ProtocolsHandlerUpgrErr,
81    OneShotHandler,
82    OneShotHandlerConfig,
83    SubstreamProtocol
84};
85pub use registry::{AddressScore, AddressRecord, AddAddressResult};
86
87use protocols_handler::{
88    NodeHandlerWrapperBuilder,
89    NodeHandlerWrapperError,
90};
91use futures::{
92    prelude::*,
93    executor::ThreadPoolBuilder,
94    stream::FusedStream,
95};
96use libp2p_core::{
97    Executor,
98    Transport,
99    Multiaddr,
100    Negotiated,
101    PeerId,
102    connection::{
103        ConnectionError,
104        ConnectionId,
105        ConnectionLimit,
106        ConnectedPoint,
107        EstablishedConnection,
108        IntoConnectionHandler,
109        ListenerId,
110        PendingConnectionError,
111        Substream
112    },
113    transport::{self, TransportError},
114    muxing::StreamMuxerBox,
115    network::{
116        ConnectionLimits,
117        Network,
118        NetworkInfo,
119        NetworkEvent,
120        NetworkConfig,
121        peer::ConnectedPeer,
122    },
123    upgrade::{ProtocolName},
124};
125use registry::{Addresses, AddressIntoIter};
126use smallvec::SmallVec;
127use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
128use std::collections::HashSet;
129use std::num::{NonZeroU32, NonZeroUsize};
130use upgrade::UpgradeInfoSend as _;
131
132/// Contains the state of the network, plus the way it should behave.
133pub type Swarm<TBehaviour> = ExpandedSwarm<
134    TBehaviour,
135    <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
136    <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
137    <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
138>;
139
140/// Substream for which a protocol has been chosen.
141///
142/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
143/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
144pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
145
146/// Event generated by the `Swarm`.
147#[derive(Debug)]
148pub enum SwarmEvent<TBvEv, THandleErr> {
149    /// Event generated by the `NetworkBehaviour`.
150    Behaviour(TBvEv),
151    /// A connection to the given peer has been opened.
152    ConnectionEstablished {
153        /// Identity of the peer that we have connected to.
154        peer_id: PeerId,
155        /// Endpoint of the connection that has been opened.
156        endpoint: ConnectedPoint,
157        /// Number of established connections to this peer, including the one that has just been
158        /// opened.
159        num_established: NonZeroU32,
160    },
161    /// A connection with the given peer has been closed,
162    /// possibly as a result of an error.
163    ConnectionClosed {
164        /// Identity of the peer that we have connected to.
165        peer_id: PeerId,
166        /// Endpoint of the connection that has been closed.
167        endpoint: ConnectedPoint,
168        /// Number of other remaining connections to this same peer.
169        num_established: u32,
170        /// Reason for the disconnection, if it was not a successful
171        /// active close.
172        cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
173    },
174    /// A new connection arrived on a listener and is in the process of protocol negotiation.
175    ///
176    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished),
177    /// [`BannedPeer`](SwarmEvent::BannedPeer), or
178    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
179    /// generated for this connection.
180    IncomingConnection {
181        /// Local connection address.
182        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
183        /// event.
184        local_addr: Multiaddr,
185        /// Address used to send back data to the remote.
186        send_back_addr: Multiaddr,
187    },
188    /// An error happened on a connection during its initial handshake.
189    ///
190    /// This can include, for example, an error during the handshake of the encryption layer, or
191    /// the connection unexpectedly closed.
192    IncomingConnectionError {
193        /// Local connection address.
194        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
195        /// event.
196        local_addr: Multiaddr,
197        /// Address used to send back data to the remote.
198        send_back_addr: Multiaddr,
199        /// The error that happened.
200        error: PendingConnectionError<io::Error>,
201    },
202    /// We connected to a peer, but we immediately closed the connection because that peer is banned.
203    BannedPeer {
204        /// Identity of the banned peer.
205        peer_id: PeerId,
206        /// Endpoint of the connection that has been closed.
207        endpoint: ConnectedPoint,
208    },
209    /// Tried to dial an address but it ended up being unreachaable.
210    UnreachableAddr {
211        /// `PeerId` that we were trying to reach.
212        peer_id: PeerId,
213        /// Address that we failed to reach.
214        address: Multiaddr,
215        /// Error that has been encountered.
216        error: PendingConnectionError<io::Error>,
217        /// Number of remaining connection attempts that are being tried for this peer.
218        attempts_remaining: u32,
219    },
220    /// Tried to dial an address but it ended up being unreachaable.
221    /// Contrary to `UnreachableAddr`, we don't know the identity of the peer that we were trying
222    /// to reach.
223    UnknownPeerUnreachableAddr {
224        /// Address that we failed to reach.
225        address: Multiaddr,
226        /// Error that has been encountered.
227        error: PendingConnectionError<io::Error>,
228    },
229    /// One of our listeners has reported a new local listening address.
230    NewListenAddr(Multiaddr),
231    /// One of our listeners has reported the expiration of a listening address.
232    ExpiredListenAddr(Multiaddr),
233    /// One of the listeners gracefully closed.
234    ListenerClosed {
235        /// The addresses that the listener was listening on. These addresses are now considered
236        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
237        /// has been generated for each of them.
238        addresses: Vec<Multiaddr>,
239        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
240        /// if the stream produced an error.
241        reason: Result<(), io::Error>,
242    },
243    /// One of the listeners reported a non-fatal error.
244    ListenerError {
245        /// The listener error.
246        error: io::Error,
247    },
248    /// A new dialing attempt has been initiated.
249    ///
250    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished)
251    /// event is reported if the dialing attempt succeeds, otherwise a
252    /// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
253    /// with `attempts_remaining` equal to 0.
254    Dialing(PeerId),
255}
256
257/// Contains the state of the network, plus the way it should behave.
258pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
259where
260    THandler: IntoProtocolsHandler,
261{
262    network: Network<
263        transport::Boxed<(PeerId, StreamMuxerBox)>,
264        TInEvent,
265        TOutEvent,
266        NodeHandlerWrapperBuilder<THandler>,
267    >,
268
269    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
270    /// handlers.
271    behaviour: TBehaviour,
272
273    /// List of protocols that the behaviour says it supports.
274    supported_protocols: SmallVec<[Vec<u8>; 16]>,
275
276    /// List of multiaddresses we're listening on.
277    listened_addrs: SmallVec<[Multiaddr; 8]>,
278
279    /// List of multiaddresses we're listening on, after account for external IP addresses and
280    /// similar mechanisms.
281    external_addrs: Addresses,
282
283    /// List of nodes for which we deny any incoming connection.
284    banned_peers: HashSet<PeerId>,
285
286    /// Pending event to be delivered to connection handlers
287    /// (or dropped if the peer disconnected) before the `behaviour`
288    /// can be polled again.
289    pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
290
291    /// The configured override for substream protocol upgrades, if any.
292    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
293}
294
295impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
296    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
297where
298    THandler: IntoProtocolsHandler,
299{
300    type Target = TBehaviour;
301
302    fn deref(&self) -> &Self::Target {
303        &self.behaviour
304    }
305}
306
307impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
308    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
309where
310    THandler: IntoProtocolsHandler,
311{
312    fn deref_mut(&mut self) -> &mut Self::Target {
313        &mut self.behaviour
314    }
315}
316
317impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
318    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
319where
320    THandler: IntoProtocolsHandler,
321{
322}
323
324impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
325    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
326where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
327      TInEvent: Send + 'static,
328      TOutEvent: Send + 'static,
329      THandler: IntoProtocolsHandler + Send + 'static,
330      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
331      THandleErr: error::Error + Send + 'static,
332{
333    /// Builds a new `Swarm`.
334    pub fn new(
335        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
336        behaviour: TBehaviour,
337        local_peer_id: PeerId
338    ) -> Self {
339        SwarmBuilder::new(transport, behaviour, local_peer_id).build()
340    }
341
342    /// Returns information about the [`Network`] underlying the `Swarm`.
343    pub fn network_info(me: &Self) -> NetworkInfo {
344        me.network.info()
345    }
346
347    /// Starts listening on the given address.
348    ///
349    /// Returns an error if the address is not supported.
350    pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
351        me.network.listen_on(addr)
352    }
353
354    /// Remove some listener.
355    ///
356    /// Returns `Ok(())` if there was a listener with this ID.
357    pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
358        me.network.remove_listener(id)
359    }
360
361    /// Initiates a new dialing attempt to the given address.
362    pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
363        let handler = me.behaviour.new_handler()
364            .into_node_handler_builder()
365            .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
366        me.network.dial(&addr, handler).map(|_id| ())
367    }
368
369    /// Initiates a new dialing attempt to the given peer.
370    pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
371        if me.banned_peers.contains(peer_id) {
372            me.behaviour.inject_dial_failure(peer_id);
373            return Err(DialError::Banned)
374        }
375
376        let self_listening = &me.listened_addrs;
377        let mut addrs = me.behaviour.addresses_of_peer(peer_id)
378            .into_iter()
379            .filter(|a| !self_listening.contains(a));
380
381        let result =
382            if let Some(first) = addrs.next() {
383                let handler = me.behaviour.new_handler()
384                    .into_node_handler_builder()
385                    .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
386                me.network.peer(*peer_id)
387                    .dial(first, addrs, handler)
388                    .map(|_| ())
389                    .map_err(DialError::ConnectionLimit)
390            } else {
391                Err(DialError::NoAddresses)
392            };
393
394        if let Err(error) = &result {
395            log::debug!(
396                "New dialing attempt to peer {:?} failed: {:?}.",
397                peer_id, error);
398            me.behaviour.inject_dial_failure(&peer_id);
399        }
400
401        result
402    }
403
404    /// Returns an iterator that produces the list of addresses we're listening on.
405    pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
406        me.network.listen_addrs()
407    }
408
409    /// Returns the peer ID of the swarm passed as parameter.
410    pub fn local_peer_id(me: &Self) -> &PeerId {
411        me.network.local_peer_id()
412    }
413
414    /// Returns an iterator for [`AddressRecord`]s of external addresses
415    /// of the local node, in decreasing order of their current
416    /// [score](AddressScore).
417    pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
418        me.external_addrs.iter()
419    }
420
421    /// Adds an external address record for the local node.
422    ///
423    /// An external address is an address of the local node known to
424    /// be (likely) reachable for other nodes, possibly taking into
425    /// account NAT. The external addresses of the local node may be
426    /// shared with other nodes by the `NetworkBehaviour`.
427    ///
428    /// The associated score determines both the position of the address
429    /// in the list of external addresses (which can determine the
430    /// order in which addresses are used to connect to) as well as
431    /// how long the address is retained in the list, depending on
432    /// how frequently it is reported by the `NetworkBehaviour` via
433    /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
434    /// through this method.
435    pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
436        me.external_addrs.add(a, s)
437    }
438
439    /// Removes an external address of the local node, regardless of
440    /// its current score. See [`ExpandedSwarm::add_external_address`]
441    /// for details.
442    ///
443    /// Returns `true` if the address existed and was removed, `false`
444    /// otherwise.
445    pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
446        me.external_addrs.remove(addr)
447    }
448
449    /// Bans a peer by its peer ID.
450    ///
451    /// Any incoming connection and any dialing attempt will immediately be rejected.
452    /// This function has no effect if the peer is already banned.
453    pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
454        if me.banned_peers.insert(peer_id) {
455            if let Some(peer) = me.network.peer(peer_id).into_connected() {
456                peer.disconnect();
457            }
458        }
459    }
460
461    /// Unbans a peer.
462    pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
463        me.banned_peers.remove(&peer_id);
464    }
465
466    /// Checks whether the [`Network`] has an established connection to a peer.
467    pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool {
468        me.network.is_connected(peer_id)
469    }
470
471    /// Returns the next event that happens in the `Swarm`.
472    ///
473    /// Includes events from the `NetworkBehaviour` but also events about the connections status.
474    pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
475        future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
476    }
477
478    /// Returns the next event produced by the [`NetworkBehaviour`].
479    pub async fn next(&mut self) -> TBehaviour::OutEvent {
480        future::poll_fn(move |cx| {
481            loop {
482                let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
483                if let SwarmEvent::Behaviour(event) = event {
484                    return Poll::Ready(event);
485                }
486            }
487        }).await
488    }
489
490    /// Internal function used by everything event-related.
491    ///
492    /// Polls the `Swarm` for the next event.
493    fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
494        -> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
495    {
496        // We use a `this` variable because the compiler can't mutably borrow multiple times
497        // across a `Deref`.
498        let this = &mut *self;
499
500        loop {
501            let mut network_not_ready = false;
502
503            // First let the network make progress.
504            match this.network.poll(cx) {
505                Poll::Pending => network_not_ready = true,
506                Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
507                    let peer = connection.peer_id();
508                    let connection = connection.id();
509                    this.behaviour.inject_event(peer, connection, event);
510                },
511                Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
512                    let peer = connection.peer_id();
513                    let connection = connection.id();
514                    this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
515                },
516                Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
517                    let peer_id = connection.peer_id();
518                    let endpoint = connection.endpoint().clone();
519                    if this.banned_peers.contains(&peer_id) {
520                        this.network.peer(peer_id)
521                            .into_connected()
522                            .expect("the Network just notified us that we were connected; QED")
523                            .disconnect();
524                        return Poll::Ready(SwarmEvent::BannedPeer {
525                            peer_id,
526                            endpoint,
527                        });
528                    } else {
529                        log::debug!("Connection established: {:?}; Total (peer): {}.",
530                            connection.connected(), num_established);
531                        let endpoint = connection.endpoint().clone();
532                        this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
533                        if num_established.get() == 1 {
534                            this.behaviour.inject_connected(&peer_id);
535                        }
536                        return Poll::Ready(SwarmEvent::ConnectionEstablished {
537                            peer_id, num_established, endpoint
538                        });
539                    }
540                },
541                Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
542                    if let Some(error) = error.as_ref() {
543                        log::debug!("Connection {:?} closed: {:?}", connected, error);
544                    } else {
545                        log::debug!("Connection {:?} closed (active close).", connected);
546                    }
547                    let peer_id = connected.peer_id;
548                    let endpoint = connected.endpoint;
549                    this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
550                    if num_established == 0 {
551                        this.behaviour.inject_disconnected(&peer_id);
552                    }
553                    return Poll::Ready(SwarmEvent::ConnectionClosed {
554                        peer_id,
555                        endpoint,
556                        cause: error,
557                        num_established,
558                    });
559                },
560                Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
561                    let handler = this.behaviour.new_handler()
562                        .into_node_handler_builder()
563                        .with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
564                    let local_addr = connection.local_addr.clone();
565                    let send_back_addr = connection.send_back_addr.clone();
566                    if let Err(e) = this.network.accept(connection, handler) {
567                        log::warn!("Incoming connection rejected: {:?}", e);
568                    }
569                    return Poll::Ready(SwarmEvent::IncomingConnection {
570                        local_addr,
571                        send_back_addr,
572                    });
573                },
574                Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
575                    log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
576                    if !this.listened_addrs.contains(&listen_addr) {
577                        this.listened_addrs.push(listen_addr.clone())
578                    }
579                    this.behaviour.inject_new_listen_addr(&listen_addr);
580                    return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
581                }
582                Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
583                    log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
584                    this.listened_addrs.retain(|a| a != &listen_addr);
585                    this.behaviour.inject_expired_listen_addr(&listen_addr);
586                    return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
587                }
588                Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
589                    log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
590                    for addr in addresses.iter() {
591                        this.behaviour.inject_expired_listen_addr(addr);
592                    }
593                    this.behaviour.inject_listener_closed(listener_id, match &reason {
594                        Ok(()) => Ok(()),
595                        Err(err) => Err(err),
596                    });
597                    return Poll::Ready(SwarmEvent::ListenerClosed {
598                        addresses,
599                        reason,
600                    });
601                }
602                Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
603                    this.behaviour.inject_listener_error(listener_id, &error);
604                    return Poll::Ready(SwarmEvent::ListenerError {
605                        error,
606                    });
607                },
608                Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
609                    log::debug!("Incoming connection failed: {:?}", error);
610                    return Poll::Ready(SwarmEvent::IncomingConnectionError {
611                        local_addr,
612                        send_back_addr,
613                        error,
614                    });
615                },
616                Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
617                    log::debug!(
618                        "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
619                        peer_id, multiaddr, error, attempts_remaining);
620                    this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
621                    if attempts_remaining == 0 {
622                        this.behaviour.inject_dial_failure(&peer_id);
623                    }
624                    return Poll::Ready(SwarmEvent::UnreachableAddr {
625                        peer_id,
626                        address: multiaddr,
627                        error,
628                        attempts_remaining,
629                    });
630                },
631                Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
632                    log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
633                        multiaddr, error);
634                    this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
635                    return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
636                        address: multiaddr,
637                        error,
638                    });
639                },
640            }
641
642            // After the network had a chance to make progress, try to deliver
643            // the pending event emitted by the behaviour in the previous iteration
644            // to the connection handler(s). The pending event must be delivered
645            // before polling the behaviour again. If the targeted peer
646            // meanwhie disconnected, the event is discarded.
647            if let Some((peer_id, handler, event)) = this.pending_event.take() {
648                if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
649                    match handler {
650                        PendingNotifyHandler::One(conn_id) =>
651                            if let Some(mut conn) = peer.connection(conn_id) {
652                                if let Some(event) = notify_one(&mut conn, event, cx) {
653                                    this.pending_event = Some((peer_id, handler, event));
654                                    return Poll::Pending
655                                }
656                            },
657                        PendingNotifyHandler::Any(ids) => {
658                            if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
659                                let handler = PendingNotifyHandler::Any(ids);
660                                this.pending_event = Some((peer_id, handler, event));
661                                return Poll::Pending
662                            }
663                        }
664                    }
665                }
666            }
667
668            debug_assert!(this.pending_event.is_none());
669
670            let behaviour_poll = {
671                let mut parameters = SwarmPollParameters {
672                    local_peer_id: &mut this.network.local_peer_id(),
673                    supported_protocols: &this.supported_protocols,
674                    listened_addrs: &this.listened_addrs,
675                    external_addrs: &this.external_addrs
676                };
677                this.behaviour.poll(cx, &mut parameters)
678            };
679
680            match behaviour_poll {
681                Poll::Pending if network_not_ready => return Poll::Pending,
682                Poll::Pending => (),
683                Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
684                    return Poll::Ready(SwarmEvent::Behaviour(event))
685                },
686                Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
687                    let _ = ExpandedSwarm::dial_addr(&mut *this, address);
688                },
689                Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
690                    if this.banned_peers.contains(&peer_id) {
691                        this.behaviour.inject_dial_failure(&peer_id);
692                    } else {
693                        let condition_matched = match condition {
694                            DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
695                            DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
696                            DialPeerCondition::Always => true,
697                        };
698                        if condition_matched {
699                            if ExpandedSwarm::dial(this, &peer_id).is_ok() {
700                                return Poll::Ready(SwarmEvent::Dialing(peer_id))
701                            }
702                        } else {
703                            // Even if the condition for a _new_ dialing attempt is not met,
704                            // we always add any potentially new addresses of the peer to an
705                            // ongoing dialing attempt, if there is one.
706                            log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
707                                peer_id, condition);
708                            let self_listening = &this.listened_addrs;
709                            if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
710                                let addrs = this.behaviour.addresses_of_peer(peer.id());
711                                let mut attempt = peer.some_attempt();
712                                for a in addrs {
713                                    if !self_listening.contains(&a) {
714                                        attempt.add_address(a);
715                                    }
716                                }
717                            }
718                        }
719                    }
720                },
721                Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
722                    if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
723                        match handler {
724                            NotifyHandler::One(connection) => {
725                                if let Some(mut conn) = peer.connection(connection) {
726                                    if let Some(event) = notify_one(&mut conn, event, cx) {
727                                        let handler = PendingNotifyHandler::One(connection);
728                                        this.pending_event = Some((peer_id, handler, event));
729                                        return Poll::Pending
730                                    }
731                                }
732                            }
733                            NotifyHandler::Any => {
734                                let ids = peer.connections().into_ids().collect();
735                                if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
736                                    let handler = PendingNotifyHandler::Any(ids);
737                                    this.pending_event = Some((peer_id, handler, event));
738                                    return Poll::Pending
739                                }
740                            }
741                        }
742                    }
743                },
744                Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
745                    for addr in this.network.address_translation(&address) {
746                        if this.external_addrs.iter().all(|a| a.addr != addr) {
747                            this.behaviour.inject_new_external_addr(&addr);
748                        }
749                        this.external_addrs.add(addr, score);
750                    }
751                },
752            }
753        }
754    }
755}
756
757/// Connection to notify of a pending event.
758///
759/// The connection IDs out of which to notify one of an event are captured at
760/// the time the behaviour emits the event, in order not to forward the event to
761/// a new connection which the behaviour may not have been aware of at the time
762/// it issued the request for sending it.
763enum PendingNotifyHandler {
764    One(ConnectionId),
765    Any(SmallVec<[ConnectionId; 10]>),
766}
767
768/// Notify a single connection of an event.
769///
770/// Returns `Some` with the given event if the connection is not currently
771/// ready to receive another event, in which case the current task is
772/// scheduled to be woken up.
773///
774/// Returns `None` if the connection is closing or the event has been
775/// successfully sent, in either case the event is consumed.
776fn notify_one<'a, TInEvent>(
777    conn: &mut EstablishedConnection<'a, TInEvent>,
778    event: TInEvent,
779    cx: &mut Context<'_>,
780) -> Option<TInEvent>
781{
782    match conn.poll_ready_notify_handler(cx) {
783        Poll::Pending => Some(event),
784        Poll::Ready(Err(())) => None, // connection is closing
785        Poll::Ready(Ok(())) => {
786            // Can now only fail if connection is closing.
787            let _ = conn.notify_handler(event);
788            None
789        }
790    }
791}
792
793/// Notify any one of a given list of connections of a peer of an event.
794///
795/// Returns `Some` with the given event and a new list of connections if
796/// none of the given connections was able to receive the event but at
797/// least one of them is not closing, in which case the current task
798/// is scheduled to be woken up. The returned connections are those which
799/// may still become ready to receive another event.
800///
801/// Returns `None` if either all connections are closing or the event
802/// was successfully sent to a handler, in either case the event is consumed.
803fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
804    ids: SmallVec<[ConnectionId; 10]>,
805    peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
806    event: TInEvent,
807    cx: &mut Context<'_>,
808) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
809where
810    TTrans: Transport,
811    THandler: IntoConnectionHandler,
812{
813    let mut pending = SmallVec::new();
814    let mut event = Some(event); // (1)
815    for id in ids.into_iter() {
816        if let Some(mut conn) = peer.connection(id) {
817            match conn.poll_ready_notify_handler(cx) {
818                Poll::Pending => pending.push(id),
819                Poll::Ready(Err(())) => {} // connection is closing
820                Poll::Ready(Ok(())) => {
821                    let e = event.take().expect("by (1),(2)");
822                    if let Err(e) = conn.notify_handler(e) {
823                        event = Some(e) // (2)
824                    } else {
825                        break
826                    }
827                }
828            }
829        }
830    }
831
832    event.and_then(|e|
833        if !pending.is_empty() {
834            Some((e, pending))
835        } else {
836            None
837        })
838}
839
840impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
841    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
842where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
843      THandler: IntoProtocolsHandler + Send + 'static,
844      TInEvent: Send + 'static,
845      TOutEvent: Send + 'static,
846      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
847{
848    type Item = TBehaviour::OutEvent;
849
850    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
851        loop {
852            let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
853            if let SwarmEvent::Behaviour(event) = event {
854                return Poll::Ready(Some(event));
855            }
856        }
857    }
858}
859
860/// the stream of behaviour events never terminates, so we can implement fused for it
861impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
862    ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
863where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
864      THandler: IntoProtocolsHandler + Send + 'static,
865      TInEvent: Send + 'static,
866      TOutEvent: Send + 'static,
867      THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
868{
869    fn is_terminated(&self) -> bool {
870        false
871    }
872}
873
874/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
875// TODO: #[derive(Debug)]
876pub struct SwarmPollParameters<'a> {
877    local_peer_id: &'a PeerId,
878    supported_protocols: &'a [Vec<u8>],
879    listened_addrs: &'a [Multiaddr],
880    external_addrs: &'a Addresses,
881}
882
883impl<'a> PollParameters for SwarmPollParameters<'a> {
884    type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
885    type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
886    type ExternalAddressesIter = AddressIntoIter;
887
888    fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
889        self.supported_protocols.to_vec().into_iter()
890    }
891
892    fn listened_addresses(&self) -> Self::ListenedAddressesIter {
893        self.listened_addrs.to_vec().into_iter()
894    }
895
896    fn external_addresses(&self) -> Self::ExternalAddressesIter {
897        self.external_addrs.clone().into_iter()
898    }
899
900    fn local_peer_id(&self) -> &PeerId {
901        &self.local_peer_id
902    }
903}
904
905/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`,
906/// including the underlying [`Network`].
907pub struct SwarmBuilder<TBehaviour> {
908    local_peer_id: PeerId,
909    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
910    behaviour: TBehaviour,
911    network_config: NetworkConfig,
912    substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
913}
914
915impl<TBehaviour> SwarmBuilder<TBehaviour>
916where TBehaviour: NetworkBehaviour,
917{
918    /// Creates a new `SwarmBuilder` from the given transport, behaviour and
919    /// local peer ID. The `Swarm` with its underlying `Network` is obtained
920    /// via [`SwarmBuilder::build`].
921    pub fn new(
922        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
923        behaviour: TBehaviour,
924        local_peer_id: PeerId
925    ) -> Self {
926        SwarmBuilder {
927            local_peer_id,
928            transport,
929            behaviour,
930            network_config: Default::default(),
931            substream_upgrade_protocol_override: None,
932        }
933    }
934
935    /// Configures the `Executor` to use for spawning background tasks.
936    ///
937    /// By default, unless another executor has been configured,
938    /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
939    pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
940        self.network_config = self.network_config.with_executor(e);
941        self
942    }
943
944    /// Configures the number of events from the [`NetworkBehaviour`] in
945    /// destination to the [`ProtocolsHandler`] that can be buffered before
946    /// the [`Swarm`] has to wait. An individual buffer with this number of
947    /// events exists for each individual connection.
948    ///
949    /// The ideal value depends on the executor used, the CPU speed, and the
950    /// volume of events. If this value is too low, then the [`Swarm`] will
951    /// be sleeping more often than necessary. Increasing this value increases
952    /// the overall memory usage.
953    pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
954        self.network_config = self.network_config.with_notify_handler_buffer_size(n);
955        self
956    }
957
958    /// Configures the number of extra events from the [`ProtocolsHandler`] in
959    /// destination to the [`NetworkBehaviour`] that can be buffered before
960    /// the [`ProtocolsHandler`] has to go to sleep.
961    ///
962    /// There exists a buffer of events received from [`ProtocolsHandler`]s
963    /// that the [`NetworkBehaviour`] has yet to process. This buffer is
964    /// shared between all instances of [`ProtocolsHandler`]. Each instance of
965    /// [`ProtocolsHandler`] is guaranteed one slot in this buffer, meaning
966    /// that delivering an event for the first time is guaranteed to be
967    /// instantaneous. Any extra event delivery, however, must wait for that
968    /// first event to be delivered or for an "extra slot" to be available.
969    ///
970    /// This option configures the number of such "extra slots" in this
971    /// shared buffer. These extra slots are assigned in a first-come,
972    /// first-served basis.
973    ///
974    /// The ideal value depends on the executor used, the CPU speed, the
975    /// average number of connections, and the volume of events. If this value
976    /// is too low, then the [`ProtocolsHandler`]s will be sleeping more often
977    /// than necessary. Increasing this value increases the overall memory
978    /// usage, and more importantly the latency between the moment when an
979    /// event is emitted and the moment when it is received by the
980    /// [`NetworkBehaviour`].
981    pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
982        self.network_config = self.network_config.with_connection_event_buffer_size(n);
983        self
984    }
985
986    /// Configures the connection limits.
987    pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
988        self.network_config = self.network_config.with_connection_limits(limits);
989        self
990    }
991
992    /// Configures an override for the substream upgrade protocol to use.
993    ///
994    /// The subtream upgrade protocol is the multistream-select protocol
995    /// used for protocol negotiation on substreams. Since a listener
996    /// supports all existing versions, the choice of upgrade protocol
997    /// only effects the "dialer", i.e. the peer opening a substream.
998    ///
999    /// > **Note**: If configured, specific upgrade protocols for
1000    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1001    /// > are ignored.
1002    pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
1003        self.substream_upgrade_protocol_override = Some(v);
1004        self
1005    }
1006
1007    /// Builds a `Swarm` with the current configuration.
1008    pub fn build(mut self) -> Swarm<TBehaviour> {
1009        let supported_protocols = self.behaviour
1010            .new_handler()
1011            .inbound_protocol()
1012            .protocol_info()
1013            .into_iter()
1014            .map(|info| info.protocol_name().to_vec())
1015            .collect();
1016
1017        // If no executor has been explicitly configured, try to set up a thread pool.
1018        let network_cfg = self.network_config.or_else_with_executor(|| {
1019            match ThreadPoolBuilder::new()
1020                .name_prefix("libp2p-swarm-task-")
1021                .create()
1022            {
1023                Ok(tp) => {
1024                    Some(Box::new(move |f| tp.spawn_ok(f)))
1025                },
1026                Err(err) => {
1027                    log::warn!("Failed to create executor thread pool: {:?}", err);
1028                    None
1029                }
1030            }
1031        });
1032
1033        let network = Network::new(self.transport, self.local_peer_id, network_cfg);
1034
1035        ExpandedSwarm {
1036            network,
1037            behaviour: self.behaviour,
1038            supported_protocols,
1039            listened_addrs: SmallVec::new(),
1040            external_addrs: Addresses::default(),
1041            banned_peers: HashSet::new(),
1042            pending_event: None,
1043            substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
1044        }
1045    }
1046}
1047
1048/// The possible failures of [`ExpandedSwarm::dial`].
1049#[derive(Debug)]
1050pub enum DialError {
1051    /// The peer is currently banned.
1052    Banned,
1053    /// The configured limit for simultaneous outgoing connections
1054    /// has been reached.
1055    ConnectionLimit(ConnectionLimit),
1056    /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
1057    /// for the peer to dial.
1058    NoAddresses
1059}
1060
1061impl fmt::Display for DialError {
1062    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1063        match self {
1064            DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
1065            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1066            DialError::Banned => write!(f, "Dial error: peer is banned.")
1067        }
1068    }
1069}
1070
1071impl error::Error for DialError {
1072    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1073        match self {
1074            DialError::ConnectionLimit(err) => Some(err),
1075            DialError::NoAddresses => None,
1076            DialError::Banned => None
1077        }
1078    }
1079}
1080
1081/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
1082#[derive(Clone, Default)]
1083pub struct DummyBehaviour {
1084}
1085
1086impl NetworkBehaviour for DummyBehaviour {
1087    type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
1088    type OutEvent = void::Void;
1089
1090    fn new_handler(&mut self) -> Self::ProtocolsHandler {
1091        protocols_handler::DummyProtocolsHandler::default()
1092    }
1093
1094    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
1095        Vec::new()
1096    }
1097
1098    fn inject_connected(&mut self, _: &PeerId) {}
1099
1100    fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1101
1102    fn inject_disconnected(&mut self, _: &PeerId) {}
1103
1104    fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1105
1106    fn inject_event(&mut self, _: PeerId, _: ConnectionId,
1107        _: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
1108
1109    fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
1110        Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
1111        ProtocolsHandler>::InEvent, Self::OutEvent>>
1112    {
1113        Poll::Pending
1114    }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    use crate::protocols_handler::DummyProtocolsHandler;
1120    use crate::test::{MockBehaviour, CallTraceBehaviour};
1121    use futures::{future, executor};
1122    use libp2p_core::{
1123        identity,
1124        upgrade,
1125        multiaddr,
1126        transport
1127    };
1128    use libp2p_noise as noise;
1129    use super::*;
1130
1131    fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
1132    where
1133        T: ProtocolsHandler + Clone,
1134        T::OutEvent: Clone,
1135        O: Send + 'static
1136    {
1137        let id_keys = identity::Keypair::generate_ed25519();
1138        let pubkey = id_keys.public();
1139        let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
1140        let transport = transport::MemoryTransport::default()
1141            .upgrade(upgrade::Version::V1)
1142            .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
1143            .multiplex(libp2p_mplex::MplexConfig::new())
1144            .boxed();
1145        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
1146        SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
1147    }
1148
1149    /// Establishes a number of connections between two peers,
1150    /// after which one peer bans the other.
1151    ///
1152    /// The test expects both behaviours to be notified via pairs of
1153    /// inject_connected / inject_disconnected as well as
1154    /// inject_connection_established / inject_connection_closed calls.
1155    #[test]
1156    fn test_connect_disconnect_ban() {
1157        // Since the test does not try to open any substreams, we can
1158        // use the dummy protocols handler.
1159        let mut handler_proto = DummyProtocolsHandler::default();
1160        handler_proto.keep_alive = KeepAlive::Yes;
1161
1162        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
1163        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
1164
1165        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1166        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1167
1168        Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
1169        Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
1170
1171        // Test execution state. Connection => Disconnecting => Connecting.
1172        enum State {
1173            Connecting,
1174            Disconnecting,
1175        }
1176
1177        let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
1178
1179        let mut banned = false;
1180        let mut unbanned = false;
1181
1182        let num_connections = 10;
1183
1184        for _ in 0 .. num_connections {
1185            Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
1186        }
1187        let mut state = State::Connecting;
1188
1189        executor::block_on(future::poll_fn(move |cx| {
1190            loop {
1191                let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1192                let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1193                match state {
1194                    State::Connecting => {
1195                        for s in &[&swarm1, &swarm2] {
1196                            if s.behaviour.inject_connection_established.len() > 0 {
1197                                assert_eq!(s.behaviour.inject_connected.len(), 1);
1198                            } else {
1199                                assert_eq!(s.behaviour.inject_connected.len(), 0);
1200                            }
1201                            assert!(s.behaviour.inject_connection_closed.len() == 0);
1202                            assert!(s.behaviour.inject_disconnected.len() == 0);
1203                        }
1204                        if [&swarm1, &swarm2].iter().all(|s| {
1205                            s.behaviour.inject_connection_established.len() == num_connections
1206                        }) {
1207                            if banned {
1208                                return Poll::Ready(())
1209                            }
1210                            Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
1211                            swarm1.behaviour.reset();
1212                            swarm2.behaviour.reset();
1213                            banned = true;
1214                            state = State::Disconnecting;
1215                        }
1216                    }
1217                    State::Disconnecting => {
1218                        for s in &[&swarm1, &swarm2] {
1219                            if s.behaviour.inject_connection_closed.len() < num_connections {
1220                                assert_eq!(s.behaviour.inject_disconnected.len(), 0);
1221                            } else {
1222                                assert_eq!(s.behaviour.inject_disconnected.len(), 1);
1223                            }
1224                            assert_eq!(s.behaviour.inject_connection_established.len(), 0);
1225                            assert_eq!(s.behaviour.inject_connected.len(), 0);
1226                        }
1227                        if [&swarm1, &swarm2].iter().all(|s| {
1228                            s.behaviour.inject_connection_closed.len() == num_connections
1229                        }) {
1230                            if unbanned {
1231                                return Poll::Ready(())
1232                            }
1233                            // Unban the first peer and reconnect.
1234                            Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
1235                            swarm1.behaviour.reset();
1236                            swarm2.behaviour.reset();
1237                            unbanned = true;
1238                            for _ in 0 .. num_connections {
1239                                Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
1240                            }
1241                            state = State::Connecting;
1242                        }
1243                    }
1244                }
1245
1246                if poll1.is_pending() && poll2.is_pending() {
1247                    return Poll::Pending
1248                }
1249            }
1250        }))
1251    }
1252}