ant_libp2p_swarm/
lib.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! High-level network manager.
22//!
23//! A [`Swarm`] contains the state of the network as a whole. The entire
24//! behaviour of a libp2p network can be controlled through the `Swarm`.
25//! The `Swarm` struct contains all active and pending connections to
26//! remotes and manages the state of all the substreams that have been
27//! opened, and all the upgrades that were built upon these substreams.
28//!
29//! # Initializing a Swarm
30//!
31//! Creating a `Swarm` requires three things:
32//!
33//!  1. A network identity of the local node in form of a [`PeerId`].
34//!  2. An implementation of the [`Transport`] trait. This is the type that will be used in order to
35//!     reach nodes on the network based on their address. See the `transport` module for more
36//!     information.
37//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state machine that defines
38//!     how the swarm should behave once it is connected to a node.
39//!
40//! # Network Behaviour
41//!
42//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
43//! the swarm how it should behave. This includes which protocols are supported
44//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
45//! controls what happens on the network. Multiple types that implement
46//! `NetworkBehaviour` can be composed into a single behaviour.
47//!
48//! # Protocols Handler
49//!
50//! The [`ConnectionHandler`] trait defines how each active connection to a
51//! remote should behave: how to handle incoming substreams, which protocols
52//! are supported, when to open a new outbound substream, etc.
53
54#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71/// Bundles all symbols required for the [`libp2p_swarm_derive::NetworkBehaviour`] macro.
72#[doc(hidden)]
73pub mod derive_prelude {
74    pub use either::Either;
75    pub use futures::prelude as futures;
76    pub use ant_libp2p_core::{
77        transport::{ListenerId, PortUse},
78        ConnectedPoint, Endpoint, Multiaddr,
79    };
80    pub use libp2p_identity::PeerId;
81
82    pub use crate::{
83        behaviour::{
84            AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85            ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86            ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87            NewListener,
88        },
89        connection::ConnectionId,
90        ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91        THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92    };
93}
94
95use std::{
96    collections::{HashMap, HashSet, VecDeque},
97    error, fmt, io,
98    num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99    pin::Pin,
100    task::{Context, Poll},
101    time::Duration,
102};
103
104pub use behaviour::{
105    AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106    ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107    ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108    NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112    pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113    IncomingInfo, PendingConnectionError, PendingInboundConnectionError,
114    PendingOutboundConnectionError,
115};
116use dial_opts::{DialOpts, PeerCondition};
117pub use executor::Executor;
118use futures::{prelude::*, stream::FusedStream};
119pub use handler::{
120    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121    OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123use ant_libp2p_core::{
124    connection::ConnectedPoint,
125    muxing::StreamMuxerBox,
126    transport::{self, ListenerId, TransportError, TransportEvent},
127    Multiaddr, Transport,
128};
129use libp2p_identity::PeerId;
130#[cfg(feature = "macros")]
131pub use libp2p_swarm_derive::NetworkBehaviour;
132pub use listen_opts::ListenOpts;
133use smallvec::SmallVec;
134pub use stream::Stream;
135pub use stream_protocol::{InvalidProtocol, StreamProtocol};
136use tracing::Instrument;
137#[doc(hidden)]
138pub use translation::_address_translation;
139
140use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
141
142/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
143type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
144
145/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
146/// supports.
147pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
148
149/// Custom event that can be received by the [`ConnectionHandler`] of the
150/// [`NetworkBehaviour`].
151pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
152
153/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
154pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
155
156/// Event generated by the `Swarm`.
157#[derive(Debug)]
158#[non_exhaustive]
159pub enum SwarmEvent<TBehaviourOutEvent> {
160    /// Event generated by the `NetworkBehaviour`.
161    Behaviour(TBehaviourOutEvent),
162    /// A connection to the given peer has been opened.
163    ConnectionEstablished {
164        /// Identity of the peer that we have connected to.
165        peer_id: PeerId,
166        /// Identifier of the connection.
167        connection_id: ConnectionId,
168        /// Endpoint of the connection that has been opened.
169        endpoint: ConnectedPoint,
170        /// Number of established connections to this peer, including the one that has just been
171        /// opened.
172        num_established: NonZeroU32,
173        /// [`Some`] when the new connection is an outgoing connection.
174        /// Addresses are dialed concurrently. Contains the addresses and errors
175        /// of dial attempts that failed before the one successful dial.
176        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
177        /// How long it took to establish this connection
178        established_in: std::time::Duration,
179    },
180    /// A connection with the given peer has been closed,
181    /// possibly as a result of an error.
182    ConnectionClosed {
183        /// Identity of the peer that we have connected to.
184        peer_id: PeerId,
185        /// Identifier of the connection.
186        connection_id: ConnectionId,
187        /// Endpoint of the connection that has been closed.
188        endpoint: ConnectedPoint,
189        /// Number of other remaining connections to this same peer.
190        num_established: u32,
191        /// Reason for the disconnection, if it was not a successful
192        /// active close.
193        cause: Option<ConnectionError>,
194    },
195    /// A new connection arrived on a listener and is in the process of protocol negotiation.
196    ///
197    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) or
198    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
199    /// generated for this connection.
200    IncomingConnection {
201        /// Identifier of the connection.
202        connection_id: ConnectionId,
203        /// Local connection address.
204        /// This address has been earlier reported with a
205        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
206        local_addr: Multiaddr,
207        /// Address used to send back data to the remote.
208        send_back_addr: Multiaddr,
209    },
210    /// An error happened on an inbound connection during its initial handshake.
211    ///
212    /// This can include, for example, an error during the handshake of the encryption layer, or
213    /// the connection unexpectedly closed.
214    IncomingConnectionError {
215        /// Identifier of the connection.
216        connection_id: ConnectionId,
217        /// Local connection address.
218        /// This address has been earlier reported with a
219        /// [`NewListenAddr`](SwarmEvent::NewListenAddr) event.
220        local_addr: Multiaddr,
221        /// Address used to send back data to the remote.
222        send_back_addr: Multiaddr,
223        /// The error that happened.
224        error: ListenError,
225    },
226    /// An error happened on an outbound connection.
227    OutgoingConnectionError {
228        /// Identifier of the connection.
229        connection_id: ConnectionId,
230        /// If known, [`PeerId`] of the peer we tried to reach.
231        peer_id: Option<PeerId>,
232        /// Error that has been encountered.
233        error: DialError,
234    },
235    /// One of our listeners has reported a new local listening address.
236    NewListenAddr {
237        /// The listener that is listening on the new address.
238        listener_id: ListenerId,
239        /// The new address that is being listened on.
240        address: Multiaddr,
241    },
242    /// One of our listeners has reported the expiration of a listening address.
243    ExpiredListenAddr {
244        /// The listener that is no longer listening on the address.
245        listener_id: ListenerId,
246        /// The expired address.
247        address: Multiaddr,
248    },
249    /// One of the listeners gracefully closed.
250    ListenerClosed {
251        /// The listener that closed.
252        listener_id: ListenerId,
253        /// The addresses that the listener was listening on. These addresses are now considered
254        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
255        /// has been generated for each of them.
256        addresses: Vec<Multiaddr>,
257        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
258        /// if the stream produced an error.
259        reason: Result<(), io::Error>,
260    },
261    /// One of the listeners reported a non-fatal error.
262    ListenerError {
263        /// The listener that errored.
264        listener_id: ListenerId,
265        /// The listener error.
266        error: io::Error,
267    },
268    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
269    /// implementation.
270    ///
271    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
272    /// reported if the dialing attempt succeeds, otherwise a
273    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
274    /// is reported.
275    Dialing {
276        /// Identity of the peer that we are connecting to.
277        peer_id: Option<PeerId>,
278
279        /// Identifier of the connection.
280        connection_id: ConnectionId,
281    },
282    /// We have discovered a new candidate for an external address for us.
283    NewExternalAddrCandidate { address: Multiaddr },
284    /// An external address of the local node was confirmed.
285    ExternalAddrConfirmed { address: Multiaddr },
286    /// An external address of the local node expired, i.e. is no-longer confirmed.
287    ExternalAddrExpired { address: Multiaddr },
288    /// We have discovered a new address of a peer.
289    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
290}
291
292impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
293    /// Extract the `TBehaviourOutEvent` from this [`SwarmEvent`] in case it is the `Behaviour`
294    /// variant, otherwise fail.
295    #[allow(clippy::result_large_err)]
296    pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
297        match self {
298            SwarmEvent::Behaviour(inner) => Ok(inner),
299            other => Err(other),
300        }
301    }
302}
303
304/// Contains the state of the network, plus the way it should behave.
305///
306/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
307/// progress.
308pub struct Swarm<TBehaviour>
309where
310    TBehaviour: NetworkBehaviour,
311{
312    /// [`Transport`] for dialing remote peers and listening for incoming connection.
313    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
314
315    /// The nodes currently active.
316    pool: Pool<THandler<TBehaviour>>,
317
318    /// The local peer ID.
319    local_peer_id: PeerId,
320
321    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
322    /// handlers.
323    behaviour: TBehaviour,
324
325    /// List of protocols that the behaviour says it supports.
326    supported_protocols: SmallVec<[Vec<u8>; 16]>,
327
328    confirmed_external_addr: HashSet<Multiaddr>,
329
330    /// Multiaddresses that our listeners are listening on,
331    listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
332
333    /// Pending event to be delivered to connection handlers
334    /// (or dropped if the peer disconnected) before the `behaviour`
335    /// can be polled again.
336    pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
337
338    pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
339}
340
341impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
342
343impl<TBehaviour> Swarm<TBehaviour>
344where
345    TBehaviour: NetworkBehaviour,
346{
347    /// Creates a new [`Swarm`] from the given [`Transport`], [`NetworkBehaviour`], [`PeerId`] and
348    /// [`Config`].
349    pub fn new(
350        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
351        behaviour: TBehaviour,
352        local_peer_id: PeerId,
353        config: Config,
354    ) -> Self {
355        tracing::info!(%local_peer_id);
356
357        Swarm {
358            local_peer_id,
359            transport,
360            pool: Pool::new(local_peer_id, config.pool_config),
361            behaviour,
362            supported_protocols: Default::default(),
363            confirmed_external_addr: Default::default(),
364            listened_addrs: HashMap::new(),
365            pending_handler_event: None,
366            pending_swarm_events: VecDeque::default(),
367        }
368    }
369
370    /// Returns information about the connections underlying the [`Swarm`].
371    pub fn network_info(&self) -> NetworkInfo {
372        let num_peers = self.pool.num_peers();
373        let connection_counters = self.pool.counters().clone();
374        NetworkInfo {
375            num_peers,
376            connection_counters,
377        }
378    }
379
380    /// Starts listening on the given address.
381    /// Returns an error if the address is not supported.
382    ///
383    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
384    /// Depending on the underlying transport, one listener may have multiple listening addresses.
385    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
386        let opts = ListenOpts::new(addr);
387        let id = opts.listener_id();
388        self.add_listener(opts)?;
389        Ok(id)
390    }
391
392    /// Remove some listener.
393    ///
394    /// Returns `true` if there was a listener with this ID, `false`
395    /// otherwise.
396    pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
397        self.transport.remove_listener(listener_id)
398    }
399
400    /// Dial a known or unknown peer.
401    ///
402    /// See also [`DialOpts`].
403    ///
404    /// ```
405    /// # use libp2p_swarm::Swarm;
406    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
407    /// # use ant_libp2p_core::{Multiaddr, Transport};
408    /// # use ant_libp2p_core::transport::dummy::DummyTransport;
409    /// # use libp2p_swarm::dummy;
410    /// # use libp2p_identity::PeerId;
411    /// #
412    /// # #[tokio::main]
413    /// # async fn main() {
414    /// let mut swarm = build_swarm();
415    ///
416    /// // Dial a known peer.
417    /// swarm.dial(PeerId::random());
418    ///
419    /// // Dial an unknown peer.
420    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
421    /// # }
422    ///
423    /// # fn build_swarm() -> Swarm<dummy::Behaviour> {
424    /// #     Swarm::new(DummyTransport::new().boxed(), dummy::Behaviour, PeerId::random(), libp2p_swarm::Config::with_tokio_executor())
425    /// # }
426    /// ```
427    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
428        let dial_opts = opts.into();
429
430        let peer_id = dial_opts.get_peer_id();
431        let condition = dial_opts.peer_condition();
432        let connection_id = dial_opts.connection_id();
433
434        let should_dial = match (condition, peer_id) {
435            (_, None) => true,
436            (PeerCondition::Always, _) => true,
437            (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
438            (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
439            (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
440                !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
441            }
442        };
443
444        if !should_dial {
445            let e = DialError::DialPeerConditionFalse(condition);
446
447            self.behaviour
448                .on_swarm_event(FromSwarm::DialFailure(DialFailure {
449                    peer_id,
450                    error: &e,
451                    connection_id,
452                }));
453
454            return Err(e);
455        }
456
457        let addresses = {
458            let mut addresses_from_opts = dial_opts.get_addresses();
459
460            match self.behaviour.handle_pending_outbound_connection(
461                connection_id,
462                peer_id,
463                addresses_from_opts.as_slice(),
464                dial_opts.role_override(),
465            ) {
466                Ok(addresses) => {
467                    if dial_opts.extend_addresses_through_behaviour() {
468                        addresses_from_opts.extend(addresses)
469                    } else {
470                        let num_addresses = addresses.len();
471
472                        if num_addresses > 0 {
473                            tracing::debug!(
474                                connection=%connection_id,
475                                discarded_addresses_count=%num_addresses,
476                                "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
477                            )
478                        }
479                    }
480                }
481                Err(cause) => {
482                    let error = DialError::Denied { cause };
483
484                    self.behaviour
485                        .on_swarm_event(FromSwarm::DialFailure(DialFailure {
486                            peer_id,
487                            error: &error,
488                            connection_id,
489                        }));
490
491                    return Err(error);
492                }
493            }
494
495            let mut unique_addresses = HashSet::new();
496            addresses_from_opts.retain(|addr| {
497                !self.listened_addrs.values().flatten().any(|a| a == addr)
498                    && unique_addresses.insert(addr.clone())
499            });
500
501            if addresses_from_opts.is_empty() {
502                let error = DialError::NoAddresses;
503                self.behaviour
504                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
505                        peer_id,
506                        error: &error,
507                        connection_id,
508                    }));
509                return Err(error);
510            };
511
512            addresses_from_opts
513        };
514
515        let dials = addresses
516            .into_iter()
517            .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
518                Ok(address) => {
519                    let dial = self.transport.dial(
520                        address.clone(),
521                        transport::DialOpts {
522                            role: dial_opts.role_override(),
523                            port_use: dial_opts.port_use(),
524                        },
525                    );
526                    let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
527                    span.follows_from(tracing::Span::current());
528                    match dial {
529                        Ok(fut) => fut
530                            .map(|r| (address, r.map_err(TransportError::Other)))
531                            .instrument(span)
532                            .boxed(),
533                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
534                    }
535                }
536                Err(address) => futures::future::ready((
537                    address.clone(),
538                    Err(TransportError::MultiaddrNotSupported(address)),
539                ))
540                .boxed(),
541            })
542            .collect();
543
544        self.pool.add_outgoing(
545            dials,
546            peer_id,
547            dial_opts.role_override(),
548            dial_opts.port_use(),
549            dial_opts.dial_concurrency_override(),
550            connection_id,
551        );
552
553        Ok(())
554    }
555
556    /// Returns an iterator that produces the list of addresses we're listening on.
557    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
558        self.listened_addrs.values().flatten()
559    }
560
561    /// Returns the peer ID of the swarm passed as parameter.
562    pub fn local_peer_id(&self) -> &PeerId {
563        &self.local_peer_id
564    }
565
566    /// List all **confirmed** external address for the local node.
567    pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
568        self.confirmed_external_addr.iter()
569    }
570
571    fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
572        let addr = opts.address();
573        let listener_id = opts.listener_id();
574
575        if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
576            self.behaviour
577                .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
578                    listener_id,
579                    err: &e,
580                }));
581
582            return Err(e);
583        }
584
585        self.behaviour
586            .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
587                listener_id,
588            }));
589
590        Ok(())
591    }
592
593    /// Add a **confirmed** external address for the local node.
594    ///
595    /// This function should only be called with addresses that are guaranteed to be reachable.
596    /// The address is broadcast to all [`NetworkBehaviour`]s via
597    /// [`FromSwarm::ExternalAddrConfirmed`].
598    pub fn add_external_address(&mut self, a: Multiaddr) {
599        self.behaviour
600            .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
601                addr: &a,
602            }));
603        self.confirmed_external_addr.insert(a);
604    }
605
606    /// Remove an external address for the local node.
607    ///
608    /// The address is broadcast to all [`NetworkBehaviour`]s via
609    /// [`FromSwarm::ExternalAddrExpired`].
610    pub fn remove_external_address(&mut self, addr: &Multiaddr) {
611        self.behaviour
612            .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
613        self.confirmed_external_addr.remove(addr);
614    }
615
616    /// Add a new external address of a remote peer.
617    ///
618    /// The address is broadcast to all [`NetworkBehaviour`]s via
619    /// [`FromSwarm::NewExternalAddrOfPeer`].
620    pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
621        self.behaviour
622            .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
623                peer_id,
624                addr: &addr,
625            }))
626    }
627
628    /// Disconnects a peer by its peer ID, closing all connections to said peer.
629    ///
630    /// Returns `Ok(())` if there was one or more established connections to the peer.
631    ///
632    /// Closing a connection via [`Swarm::disconnect_peer_id`] will poll
633    /// [`ConnectionHandler::poll_close`] to completion. Use this function if you want to close
634    /// a connection _despite_ it still being in use by one or more handlers.
635    #[allow(clippy::result_unit_err)]
636    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
637        let was_connected = self.pool.is_connected(peer_id);
638        self.pool.disconnect(peer_id);
639
640        if was_connected {
641            Ok(())
642        } else {
643            Err(())
644        }
645    }
646
647    /// Attempt to gracefully close a connection.
648    ///
649    /// Closing a connection is asynchronous but this function will return immediately.
650    /// A [`SwarmEvent::ConnectionClosed`] event will be emitted
651    /// once the connection is actually closed.
652    ///
653    /// # Returns
654    ///
655    /// - `true` if the connection was established and is now being closed.
656    /// - `false` if the connection was not found or is no longer established.
657    pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
658        if let Some(established) = self.pool.get_established(connection_id) {
659            established.start_close();
660            return true;
661        }
662
663        false
664    }
665
666    /// Checks whether there is an established connection to a peer.
667    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
668        self.pool.is_connected(*peer_id)
669    }
670
671    /// Returns the currently connected peers.
672    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
673        self.pool.iter_connected()
674    }
675
676    /// Returns a reference to the provided [`NetworkBehaviour`].
677    pub fn behaviour(&self) -> &TBehaviour {
678        &self.behaviour
679    }
680
681    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
682    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
683        &mut self.behaviour
684    }
685
686    fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
687        match event {
688            PoolEvent::ConnectionEstablished {
689                peer_id,
690                id,
691                endpoint,
692                connection,
693                concurrent_dial_errors,
694                established_in,
695            } => {
696                let handler = match endpoint.clone() {
697                    ConnectedPoint::Dialer {
698                        address,
699                        role_override,
700                        port_use,
701                    } => {
702                        match self.behaviour.handle_established_outbound_connection(
703                            id,
704                            peer_id,
705                            &address,
706                            role_override,
707                            port_use,
708                        ) {
709                            Ok(handler) => handler,
710                            Err(cause) => {
711                                let dial_error = DialError::Denied { cause };
712                                self.behaviour.on_swarm_event(FromSwarm::DialFailure(
713                                    DialFailure {
714                                        connection_id: id,
715                                        error: &dial_error,
716                                        peer_id: Some(peer_id),
717                                    },
718                                ));
719
720                                self.pending_swarm_events.push_back(
721                                    SwarmEvent::OutgoingConnectionError {
722                                        peer_id: Some(peer_id),
723                                        connection_id: id,
724                                        error: dial_error,
725                                    },
726                                );
727                                return;
728                            }
729                        }
730                    }
731                    ConnectedPoint::Listener {
732                        local_addr,
733                        send_back_addr,
734                    } => {
735                        match self.behaviour.handle_established_inbound_connection(
736                            id,
737                            peer_id,
738                            &local_addr,
739                            &send_back_addr,
740                        ) {
741                            Ok(handler) => handler,
742                            Err(cause) => {
743                                let listen_error = ListenError::Denied { cause };
744                                self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
745                                    ListenFailure {
746                                        local_addr: &local_addr,
747                                        send_back_addr: &send_back_addr,
748                                        error: &listen_error,
749                                        connection_id: id,
750                                        peer_id: Some(peer_id),
751                                    },
752                                ));
753
754                                self.pending_swarm_events.push_back(
755                                    SwarmEvent::IncomingConnectionError {
756                                        connection_id: id,
757                                        send_back_addr,
758                                        local_addr,
759                                        error: listen_error,
760                                    },
761                                );
762                                return;
763                            }
764                        }
765                    }
766                };
767
768                let supported_protocols = handler
769                    .listen_protocol()
770                    .upgrade()
771                    .protocol_info()
772                    .map(|p| p.as_ref().as_bytes().to_vec())
773                    .collect();
774                let other_established_connection_ids = self
775                    .pool
776                    .iter_established_connections_of_peer(&peer_id)
777                    .collect::<Vec<_>>();
778                let num_established = NonZeroU32::new(
779                    u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
780                )
781                .expect("n + 1 is always non-zero; qed");
782
783                self.pool
784                    .spawn_connection(id, peer_id, &endpoint, connection, handler);
785
786                tracing::debug!(
787                    peer=%peer_id,
788                    ?endpoint,
789                    total_peers=%num_established,
790                    "Connection established"
791                );
792                let failed_addresses = concurrent_dial_errors
793                    .as_ref()
794                    .map(|es| {
795                        es.iter()
796                            .map(|(a, _)| a)
797                            .cloned()
798                            .collect::<Vec<Multiaddr>>()
799                    })
800                    .unwrap_or_default();
801                self.behaviour
802                    .on_swarm_event(FromSwarm::ConnectionEstablished(
803                        behaviour::ConnectionEstablished {
804                            peer_id,
805                            connection_id: id,
806                            endpoint: &endpoint,
807                            failed_addresses: &failed_addresses,
808                            other_established: other_established_connection_ids.len(),
809                        },
810                    ));
811                self.supported_protocols = supported_protocols;
812                self.pending_swarm_events
813                    .push_back(SwarmEvent::ConnectionEstablished {
814                        peer_id,
815                        connection_id: id,
816                        num_established,
817                        endpoint,
818                        concurrent_dial_errors,
819                        established_in,
820                    });
821            }
822            PoolEvent::PendingOutboundConnectionError {
823                id: connection_id,
824                error,
825                peer,
826            } => {
827                let error = error.into();
828
829                self.behaviour
830                    .on_swarm_event(FromSwarm::DialFailure(DialFailure {
831                        peer_id: peer,
832                        error: &error,
833                        connection_id,
834                    }));
835
836                if let Some(peer) = peer {
837                    tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
838                } else {
839                    tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
840                }
841
842                self.pending_swarm_events
843                    .push_back(SwarmEvent::OutgoingConnectionError {
844                        peer_id: peer,
845                        connection_id,
846                        error,
847                    });
848            }
849            PoolEvent::PendingInboundConnectionError {
850                id,
851                send_back_addr,
852                local_addr,
853                error,
854            } => {
855                let error = error.into();
856
857                tracing::debug!("Incoming connection failed: {:?}", error);
858                self.behaviour
859                    .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
860                        local_addr: &local_addr,
861                        send_back_addr: &send_back_addr,
862                        error: &error,
863                        connection_id: id,
864                        peer_id: None,
865                    }));
866                self.pending_swarm_events
867                    .push_back(SwarmEvent::IncomingConnectionError {
868                        connection_id: id,
869                        local_addr,
870                        send_back_addr,
871                        error,
872                    });
873            }
874            PoolEvent::ConnectionClosed {
875                id,
876                connected,
877                error,
878                remaining_established_connection_ids,
879                ..
880            } => {
881                if let Some(error) = error.as_ref() {
882                    tracing::debug!(
883                        total_peers=%remaining_established_connection_ids.len(),
884                        "Connection closed with error {:?}: {:?}",
885                        error,
886                        connected,
887                    );
888                } else {
889                    tracing::debug!(
890                        total_peers=%remaining_established_connection_ids.len(),
891                        "Connection closed: {:?}",
892                        connected
893                    );
894                }
895                let peer_id = connected.peer_id;
896                let endpoint = connected.endpoint;
897                let num_established =
898                    u32::try_from(remaining_established_connection_ids.len()).unwrap();
899
900                self.behaviour
901                    .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
902                        peer_id,
903                        connection_id: id,
904                        endpoint: &endpoint,
905                        cause: error.as_ref(),
906                        remaining_established: num_established as usize,
907                    }));
908                self.pending_swarm_events
909                    .push_back(SwarmEvent::ConnectionClosed {
910                        peer_id,
911                        connection_id: id,
912                        endpoint,
913                        cause: error,
914                        num_established,
915                    });
916            }
917            PoolEvent::ConnectionEvent { peer_id, id, event } => {
918                self.behaviour
919                    .on_connection_handler_event(peer_id, id, event);
920            }
921            PoolEvent::AddressChange {
922                peer_id,
923                id,
924                new_endpoint,
925                old_endpoint,
926            } => {
927                self.behaviour
928                    .on_swarm_event(FromSwarm::AddressChange(AddressChange {
929                        peer_id,
930                        connection_id: id,
931                        old: &old_endpoint,
932                        new: &new_endpoint,
933                    }));
934            }
935        }
936    }
937
938    fn handle_transport_event(
939        &mut self,
940        event: TransportEvent<
941            <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
942            io::Error,
943        >,
944    ) {
945        match event {
946            TransportEvent::Incoming {
947                listener_id: _,
948                upgrade,
949                local_addr,
950                send_back_addr,
951            } => {
952                let connection_id = ConnectionId::next();
953
954                match self.behaviour.handle_pending_inbound_connection(
955                    connection_id,
956                    &local_addr,
957                    &send_back_addr,
958                ) {
959                    Ok(()) => {}
960                    Err(cause) => {
961                        let listen_error = ListenError::Denied { cause };
962
963                        self.behaviour
964                            .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
965                                local_addr: &local_addr,
966                                send_back_addr: &send_back_addr,
967                                error: &listen_error,
968                                connection_id,
969                                peer_id: None,
970                            }));
971
972                        self.pending_swarm_events
973                            .push_back(SwarmEvent::IncomingConnectionError {
974                                connection_id,
975                                local_addr,
976                                send_back_addr,
977                                error: listen_error,
978                            });
979                        return;
980                    }
981                }
982
983                self.pool.add_incoming(
984                    upgrade,
985                    IncomingInfo {
986                        local_addr: &local_addr,
987                        send_back_addr: &send_back_addr,
988                    },
989                    connection_id,
990                );
991
992                self.pending_swarm_events
993                    .push_back(SwarmEvent::IncomingConnection {
994                        connection_id,
995                        local_addr,
996                        send_back_addr,
997                    })
998            }
999            TransportEvent::NewAddress {
1000                listener_id,
1001                listen_addr,
1002            } => {
1003                tracing::debug!(
1004                    listener=?listener_id,
1005                    address=%listen_addr,
1006                    "New listener address"
1007                );
1008                let addrs = self.listened_addrs.entry(listener_id).or_default();
1009                if !addrs.contains(&listen_addr) {
1010                    addrs.push(listen_addr.clone())
1011                }
1012                self.behaviour
1013                    .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1014                        listener_id,
1015                        addr: &listen_addr,
1016                    }));
1017                self.pending_swarm_events
1018                    .push_back(SwarmEvent::NewListenAddr {
1019                        listener_id,
1020                        address: listen_addr,
1021                    })
1022            }
1023            TransportEvent::AddressExpired {
1024                listener_id,
1025                listen_addr,
1026            } => {
1027                tracing::debug!(
1028                    listener=?listener_id,
1029                    address=%listen_addr,
1030                    "Expired listener address"
1031                );
1032                if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1033                    addrs.retain(|a| a != &listen_addr);
1034                }
1035                self.behaviour
1036                    .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1037                        listener_id,
1038                        addr: &listen_addr,
1039                    }));
1040                self.pending_swarm_events
1041                    .push_back(SwarmEvent::ExpiredListenAddr {
1042                        listener_id,
1043                        address: listen_addr,
1044                    })
1045            }
1046            TransportEvent::ListenerClosed {
1047                listener_id,
1048                reason,
1049            } => {
1050                tracing::debug!(
1051                    listener=?listener_id,
1052                    ?reason,
1053                    "Listener closed"
1054                );
1055                let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1056                for addr in addrs.iter() {
1057                    self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1058                        ExpiredListenAddr { listener_id, addr },
1059                    ));
1060                }
1061                self.behaviour
1062                    .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1063                        listener_id,
1064                        reason: reason.as_ref().copied(),
1065                    }));
1066                self.pending_swarm_events
1067                    .push_back(SwarmEvent::ListenerClosed {
1068                        listener_id,
1069                        addresses: addrs.to_vec(),
1070                        reason,
1071                    })
1072            }
1073            TransportEvent::ListenerError { listener_id, error } => {
1074                self.behaviour
1075                    .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1076                        listener_id,
1077                        err: &error,
1078                    }));
1079                self.pending_swarm_events
1080                    .push_back(SwarmEvent::ListenerError { listener_id, error })
1081            }
1082        }
1083    }
1084
1085    fn handle_behaviour_event(
1086        &mut self,
1087        event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1088    ) {
1089        match event {
1090            ToSwarm::GenerateEvent(event) => {
1091                self.pending_swarm_events
1092                    .push_back(SwarmEvent::Behaviour(event));
1093            }
1094            ToSwarm::Dial { opts } => {
1095                let peer_id = opts.get_peer_id();
1096                let connection_id = opts.connection_id();
1097                if let Ok(()) = self.dial(opts) {
1098                    self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1099                        peer_id,
1100                        connection_id,
1101                    });
1102                }
1103            }
1104            ToSwarm::ListenOn { opts } => {
1105                // Error is dispatched internally, safe to ignore.
1106                let _ = self.add_listener(opts);
1107            }
1108            ToSwarm::RemoveListener { id } => {
1109                self.remove_listener(id);
1110            }
1111            ToSwarm::NotifyHandler {
1112                peer_id,
1113                handler,
1114                event,
1115            } => {
1116                assert!(self.pending_handler_event.is_none());
1117                let handler = match handler {
1118                    NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1119                    NotifyHandler::Any => {
1120                        let ids = self
1121                            .pool
1122                            .iter_established_connections_of_peer(&peer_id)
1123                            .collect();
1124                        PendingNotifyHandler::Any(ids)
1125                    }
1126                };
1127
1128                self.pending_handler_event = Some((peer_id, handler, event));
1129            }
1130            ToSwarm::NewExternalAddrCandidate(addr) => {
1131                if !self.confirmed_external_addr.contains(&addr) {
1132                    self.behaviour
1133                        .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1134                            NewExternalAddrCandidate { addr: &addr },
1135                        ));
1136                    self.pending_swarm_events
1137                        .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1138                }
1139            }
1140            ToSwarm::ExternalAddrConfirmed(addr) => {
1141                self.add_external_address(addr.clone());
1142                self.pending_swarm_events
1143                    .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1144            }
1145            ToSwarm::ExternalAddrExpired(addr) => {
1146                self.remove_external_address(&addr);
1147                self.pending_swarm_events
1148                    .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1149            }
1150            ToSwarm::CloseConnection {
1151                peer_id,
1152                connection,
1153            } => match connection {
1154                CloseConnection::One(connection_id) => {
1155                    if let Some(conn) = self.pool.get_established(connection_id) {
1156                        conn.start_close();
1157                    }
1158                }
1159                CloseConnection::All => {
1160                    self.pool.disconnect(peer_id);
1161                }
1162            },
1163            ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1164                self.behaviour
1165                    .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1166                        peer_id,
1167                        addr: &address,
1168                    }));
1169                self.pending_swarm_events
1170                    .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1171            }
1172        }
1173    }
1174
1175    /// Internal function used by everything event-related.
1176    ///
1177    /// Polls the `Swarm` for the next event.
1178    #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1179    fn poll_next_event(
1180        mut self: Pin<&mut Self>,
1181        cx: &mut Context<'_>,
1182    ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1183        // We use a `this` variable because the compiler can't mutably borrow multiple times
1184        // across a `Deref`.
1185        let this = &mut *self;
1186
1187        // This loop polls the components below in a prioritized order.
1188        //
1189        // 1. [`NetworkBehaviour`]
1190        // 2. Connection [`Pool`]
1191        // 3. [`ListenersStream`]
1192        //
1193        // (1) is polled before (2) to prioritize local work over work coming from a remote.
1194        //
1195        // (2) is polled before (3) to prioritize existing connections
1196        // over upgrading new incoming connections.
1197        loop {
1198            if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1199                return Poll::Ready(swarm_event);
1200            }
1201
1202            match this.pending_handler_event.take() {
1203                // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the
1204                // previous iteration to the connection handler(s).
1205                Some((peer_id, handler, event)) => match handler {
1206                    PendingNotifyHandler::One(conn_id) => {
1207                        match this.pool.get_established(conn_id) {
1208                            Some(conn) => match notify_one(conn, event, cx) {
1209                                None => continue,
1210                                Some(event) => {
1211                                    this.pending_handler_event = Some((peer_id, handler, event));
1212                                }
1213                            },
1214                            None => continue,
1215                        }
1216                    }
1217                    PendingNotifyHandler::Any(ids) => {
1218                        match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1219                            None => continue,
1220                            Some((event, ids)) => {
1221                                let handler = PendingNotifyHandler::Any(ids);
1222                                this.pending_handler_event = Some((peer_id, handler, event));
1223                            }
1224                        }
1225                    }
1226                },
1227                // No pending event. Allow the [`NetworkBehaviour`] to make progress.
1228                None => match this.behaviour.poll(cx) {
1229                    Poll::Pending => {}
1230                    Poll::Ready(behaviour_event) => {
1231                        this.handle_behaviour_event(behaviour_event);
1232
1233                        continue;
1234                    }
1235                },
1236            }
1237
1238            // Poll the known peers.
1239            match this.pool.poll(cx) {
1240                Poll::Pending => {}
1241                Poll::Ready(pool_event) => {
1242                    this.handle_pool_event(pool_event);
1243                    continue;
1244                }
1245            }
1246
1247            // Poll the listener(s) for new connections.
1248            match Pin::new(&mut this.transport).poll(cx) {
1249                Poll::Pending => {}
1250                Poll::Ready(transport_event) => {
1251                    this.handle_transport_event(transport_event);
1252                    continue;
1253                }
1254            }
1255
1256            return Poll::Pending;
1257        }
1258    }
1259}
1260
1261/// Connection to notify of a pending event.
1262///
1263/// The connection IDs out of which to notify one of an event are captured at
1264/// the time the behaviour emits the event, in order not to forward the event to
1265/// a new connection which the behaviour may not have been aware of at the time
1266/// it issued the request for sending it.
1267enum PendingNotifyHandler {
1268    One(ConnectionId),
1269    Any(SmallVec<[ConnectionId; 10]>),
1270}
1271
1272/// Notify a single connection of an event.
1273///
1274/// Returns `Some` with the given event if the connection is not currently
1275/// ready to receive another event, in which case the current task is
1276/// scheduled to be woken up.
1277///
1278/// Returns `None` if the connection is closing or the event has been
1279/// successfully sent, in either case the event is consumed.
1280fn notify_one<THandlerInEvent>(
1281    conn: &mut EstablishedConnection<THandlerInEvent>,
1282    event: THandlerInEvent,
1283    cx: &mut Context<'_>,
1284) -> Option<THandlerInEvent> {
1285    match conn.poll_ready_notify_handler(cx) {
1286        Poll::Pending => Some(event),
1287        Poll::Ready(Err(())) => None, // connection is closing
1288        Poll::Ready(Ok(())) => {
1289            // Can now only fail if connection is closing.
1290            let _ = conn.notify_handler(event);
1291            None
1292        }
1293    }
1294}
1295
1296/// Notify any one of a given list of connections of a peer of an event.
1297///
1298/// Returns `Some` with the given event and a new list of connections if
1299/// none of the given connections was able to receive the event but at
1300/// least one of them is not closing, in which case the current task
1301/// is scheduled to be woken up. The returned connections are those which
1302/// may still become ready to receive another event.
1303///
1304/// Returns `None` if either all connections are closing or the event
1305/// was successfully sent to a handler, in either case the event is consumed.
1306fn notify_any<THandler, TBehaviour>(
1307    ids: SmallVec<[ConnectionId; 10]>,
1308    pool: &mut Pool<THandler>,
1309    event: THandlerInEvent<TBehaviour>,
1310    cx: &mut Context<'_>,
1311) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1312where
1313    TBehaviour: NetworkBehaviour,
1314    THandler: ConnectionHandler<
1315        FromBehaviour = THandlerInEvent<TBehaviour>,
1316        ToBehaviour = THandlerOutEvent<TBehaviour>,
1317    >,
1318{
1319    let mut pending = SmallVec::new();
1320    let mut event = Some(event); // (1)
1321    for id in ids.into_iter() {
1322        if let Some(conn) = pool.get_established(id) {
1323            match conn.poll_ready_notify_handler(cx) {
1324                Poll::Pending => pending.push(id),
1325                Poll::Ready(Err(())) => {} // connection is closing
1326                Poll::Ready(Ok(())) => {
1327                    let e = event.take().expect("by (1),(2)");
1328                    if let Err(e) = conn.notify_handler(e) {
1329                        event = Some(e) // (2)
1330                    } else {
1331                        break;
1332                    }
1333                }
1334            }
1335        }
1336    }
1337
1338    event.and_then(|e| {
1339        if !pending.is_empty() {
1340            Some((e, pending))
1341        } else {
1342            None
1343        }
1344    })
1345}
1346
1347/// Stream of events returned by [`Swarm`].
1348///
1349/// Includes events from the [`NetworkBehaviour`] as well as events about
1350/// connection and listener status. See [`SwarmEvent`] for details.
1351///
1352/// Note: This stream is infinite and it is guaranteed that
1353/// [`futures::Stream::poll_next`] will never return `Poll::Ready(None)`.
1354impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1355where
1356    TBehaviour: NetworkBehaviour,
1357{
1358    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1359
1360    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1361        self.as_mut().poll_next_event(cx).map(Some)
1362    }
1363}
1364
1365/// The stream of swarm events never terminates, so we can implement fused for it.
1366impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1367where
1368    TBehaviour: NetworkBehaviour,
1369{
1370    fn is_terminated(&self) -> bool {
1371        false
1372    }
1373}
1374
1375pub struct Config {
1376    pool_config: PoolConfig,
1377}
1378
1379impl Config {
1380    /// Creates a new [`Config`] from the given executor. The [`Swarm`] is obtained via
1381    /// [`Swarm::new`].
1382    pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1383        Self {
1384            pool_config: PoolConfig::new(Some(Box::new(executor))),
1385        }
1386    }
1387
1388    #[doc(hidden)]
1389    /// Used on connection benchmarks.
1390    pub fn without_executor() -> Self {
1391        Self {
1392            pool_config: PoolConfig::new(None),
1393        }
1394    }
1395
1396    /// Sets executor to the `wasm` executor.
1397    /// Background tasks will be executed by the browser on the next micro-tick.
1398    ///
1399    /// Spawning a task is similar too:
1400    /// ```typescript
1401    /// function spawn(task: () => Promise<void>) {
1402    ///     task()
1403    /// }
1404    /// ```
1405    #[cfg(feature = "wasm-bindgen")]
1406    pub fn with_wasm_executor() -> Self {
1407        Self::with_executor(crate::executor::WasmBindgenExecutor)
1408    }
1409
1410    /// Builds a new [`Config`] from the given `tokio` executor.
1411    #[cfg(all(
1412        feature = "tokio",
1413        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1414    ))]
1415    pub fn with_tokio_executor() -> Self {
1416        Self::with_executor(crate::executor::TokioExecutor)
1417    }
1418
1419    /// Builds a new [`Config`] from the given `async-std` executor.
1420    #[cfg(all(
1421        feature = "async-std",
1422        not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423    ))]
1424    pub fn with_async_std_executor() -> Self {
1425        Self::with_executor(crate::executor::AsyncStdExecutor)
1426    }
1427
1428    /// Configures the number of events from the [`NetworkBehaviour`] in
1429    /// destination to the [`ConnectionHandler`] that can be buffered before
1430    /// the [`Swarm`] has to wait. An individual buffer with this number of
1431    /// events exists for each individual connection.
1432    ///
1433    /// The ideal value depends on the executor used, the CPU speed, and the
1434    /// volume of events. If this value is too low, then the [`Swarm`] will
1435    /// be sleeping more often than necessary. Increasing this value increases
1436    /// the overall memory usage.
1437    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1438        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1439        self
1440    }
1441
1442    /// Configures the size of the buffer for events sent by a [`ConnectionHandler`] to the
1443    /// [`NetworkBehaviour`].
1444    ///
1445    /// Each connection has its own buffer.
1446    ///
1447    /// The ideal value depends on the executor used, the CPU speed and the volume of events.
1448    /// If this value is too low, then the [`ConnectionHandler`]s will be sleeping more often
1449    /// than necessary. Increasing this value increases the overall memory
1450    /// usage, and more importantly the latency between the moment when an
1451    /// event is emitted and the moment when it is received by the
1452    /// [`NetworkBehaviour`].
1453    pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1454        self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1455        self
1456    }
1457
1458    /// Number of addresses concurrently dialed for a single outbound connection attempt.
1459    pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1460        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1461        self
1462    }
1463
1464    /// Configures an override for the substream upgrade protocol to use.
1465    ///
1466    /// The subtream upgrade protocol is the multistream-select protocol
1467    /// used for protocol negotiation on substreams. Since a listener
1468    /// supports all existing versions, the choice of upgrade protocol
1469    /// only effects the "dialer", i.e. the peer opening a substream.
1470    ///
1471    /// > **Note**: If configured, specific upgrade protocols for
1472    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
1473    /// > are ignored.
1474    pub fn with_substream_upgrade_protocol_override(
1475        mut self,
1476        v: ant_libp2p_core::upgrade::Version,
1477    ) -> Self {
1478        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1479        self
1480    }
1481
1482    /// The maximum number of inbound streams concurrently negotiating on a
1483    /// connection. New inbound streams exceeding the limit are dropped and thus
1484    /// reset.
1485    ///
1486    /// Note: This only enforces a limit on the number of concurrently
1487    /// negotiating inbound streams. The total number of inbound streams on a
1488    /// connection is the sum of negotiating and negotiated streams. A limit on
1489    /// the total number of streams can be enforced at the
1490    /// [`StreamMuxerBox`] level.
1491    pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1492        self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1493        self
1494    }
1495
1496    /// How long to keep a connection alive once it is idling.
1497    ///
1498    /// Defaults to 0.
1499    pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1500        self.pool_config.idle_connection_timeout = timeout;
1501        self
1502    }
1503}
1504
1505/// Possible errors when trying to establish or upgrade an outbound connection.
1506#[derive(Debug)]
1507pub enum DialError {
1508    /// The peer identity obtained on the connection matches the local peer.
1509    LocalPeerId { endpoint: ConnectedPoint },
1510    /// No addresses have been provided by [`NetworkBehaviour::handle_pending_outbound_connection`]
1511    /// and [`DialOpts`].
1512    NoAddresses,
1513    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
1514    /// the dial was aborted.
1515    DialPeerConditionFalse(dial_opts::PeerCondition),
1516    /// Pending connection attempt has been aborted.
1517    Aborted,
1518    /// The peer identity obtained on the connection did not match the one that was expected.
1519    WrongPeerId {
1520        obtained: PeerId,
1521        endpoint: ConnectedPoint,
1522    },
1523    /// One of the [`NetworkBehaviour`]s rejected the outbound connection
1524    /// via [`NetworkBehaviour::handle_pending_outbound_connection`] or
1525    /// [`NetworkBehaviour::handle_established_outbound_connection`].
1526    Denied { cause: ConnectionDenied },
1527    /// An error occurred while negotiating the transport protocol(s) on a connection.
1528    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1529}
1530
1531impl From<PendingOutboundConnectionError> for DialError {
1532    fn from(error: PendingOutboundConnectionError) -> Self {
1533        match error {
1534            PendingConnectionError::Aborted => DialError::Aborted,
1535            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1536                DialError::WrongPeerId { obtained, endpoint }
1537            }
1538            PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1539            PendingConnectionError::Transport(e) => DialError::Transport(e),
1540        }
1541    }
1542}
1543
1544impl fmt::Display for DialError {
1545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1546        match self {
1547            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1548            DialError::LocalPeerId { endpoint } => write!(
1549                f,
1550                "Dial error: tried to dial local peer id at {endpoint:?}."
1551            ),
1552            DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1553            DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1554            DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1555            DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1556            DialError::Aborted => write!(
1557                f,
1558                "Dial error: Pending connection attempt has been aborted."
1559            ),
1560            DialError::WrongPeerId { obtained, endpoint } => write!(
1561                f,
1562                "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1563            ),
1564            DialError::Transport(errors) => {
1565                write!(f, "Failed to negotiate transport protocol(s): [")?;
1566
1567                for (addr, error) in errors {
1568                    write!(f, "({addr}")?;
1569                    print_error_chain(f, error)?;
1570                    write!(f, ")")?;
1571                }
1572                write!(f, "]")?;
1573
1574                Ok(())
1575            }
1576            DialError::Denied { .. } => {
1577                write!(f, "Dial error")
1578            }
1579        }
1580    }
1581}
1582
1583fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1584    write!(f, ": {e}")?;
1585
1586    if let Some(source) = e.source() {
1587        print_error_chain(f, source)?;
1588    }
1589
1590    Ok(())
1591}
1592
1593impl error::Error for DialError {
1594    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1595        match self {
1596            DialError::LocalPeerId { .. } => None,
1597            DialError::NoAddresses => None,
1598            DialError::DialPeerConditionFalse(_) => None,
1599            DialError::Aborted => None,
1600            DialError::WrongPeerId { .. } => None,
1601            DialError::Transport(_) => None,
1602            DialError::Denied { cause } => Some(cause),
1603        }
1604    }
1605}
1606
1607/// Possible errors when upgrading an inbound connection.
1608#[derive(Debug)]
1609pub enum ListenError {
1610    /// Pending connection attempt has been aborted.
1611    Aborted,
1612    /// The peer identity obtained on the connection did not match the one that was expected.
1613    WrongPeerId {
1614        obtained: PeerId,
1615        endpoint: ConnectedPoint,
1616    },
1617    /// The connection was dropped because it resolved to our own [`PeerId`].
1618    LocalPeerId {
1619        endpoint: ConnectedPoint,
1620    },
1621    Denied {
1622        cause: ConnectionDenied,
1623    },
1624    /// An error occurred while negotiating the transport protocol(s) on a connection.
1625    Transport(TransportError<io::Error>),
1626}
1627
1628impl From<PendingInboundConnectionError> for ListenError {
1629    fn from(error: PendingInboundConnectionError) -> Self {
1630        match error {
1631            PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1632            PendingInboundConnectionError::Aborted => ListenError::Aborted,
1633            PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1634                ListenError::WrongPeerId { obtained, endpoint }
1635            }
1636            PendingInboundConnectionError::LocalPeerId { endpoint } => {
1637                ListenError::LocalPeerId { endpoint }
1638            }
1639        }
1640    }
1641}
1642
1643impl fmt::Display for ListenError {
1644    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1645        match self {
1646            ListenError::Aborted => write!(
1647                f,
1648                "Listen error: Pending connection attempt has been aborted."
1649            ),
1650            ListenError::WrongPeerId { obtained, endpoint } => write!(
1651                f,
1652                "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1653            ),
1654            ListenError::Transport(_) => {
1655                write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1656            }
1657            ListenError::Denied { cause } => {
1658                write!(f, "Listen error: Denied: {cause}")
1659            }
1660            ListenError::LocalPeerId { endpoint } => {
1661                write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1662            }
1663        }
1664    }
1665}
1666
1667impl error::Error for ListenError {
1668    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1669        match self {
1670            ListenError::WrongPeerId { .. } => None,
1671            ListenError::Transport(err) => Some(err),
1672            ListenError::Aborted => None,
1673            ListenError::Denied { cause } => Some(cause),
1674            ListenError::LocalPeerId { .. } => None,
1675        }
1676    }
1677}
1678
1679/// A connection was denied.
1680///
1681/// To figure out which [`NetworkBehaviour`] denied the connection, use
1682/// [`ConnectionDenied::downcast`].
1683#[derive(Debug)]
1684pub struct ConnectionDenied {
1685    inner: Box<dyn error::Error + Send + Sync + 'static>,
1686}
1687
1688impl ConnectionDenied {
1689    pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1690        Self {
1691            inner: cause.into(),
1692        }
1693    }
1694
1695    /// Attempt to downcast to a particular reason for why the connection was denied.
1696    pub fn downcast<E>(self) -> Result<E, Self>
1697    where
1698        E: error::Error + Send + Sync + 'static,
1699    {
1700        let inner = self
1701            .inner
1702            .downcast::<E>()
1703            .map_err(|inner| ConnectionDenied { inner })?;
1704
1705        Ok(*inner)
1706    }
1707
1708    /// Attempt to downcast to a particular reason for why the connection was denied.
1709    pub fn downcast_ref<E>(&self) -> Option<&E>
1710    where
1711        E: error::Error + Send + Sync + 'static,
1712    {
1713        self.inner.downcast_ref::<E>()
1714    }
1715}
1716
1717impl fmt::Display for ConnectionDenied {
1718    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1719        write!(f, "connection denied")
1720    }
1721}
1722
1723impl error::Error for ConnectionDenied {
1724    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1725        Some(self.inner.as_ref())
1726    }
1727}
1728
1729/// Information about the connections obtained by [`Swarm::network_info()`].
1730#[derive(Clone, Debug)]
1731pub struct NetworkInfo {
1732    /// The total number of connected peers.
1733    num_peers: usize,
1734    /// Counters of ongoing network connections.
1735    connection_counters: ConnectionCounters,
1736}
1737
1738impl NetworkInfo {
1739    /// The number of connected peers, i.e. peers with whom at least
1740    /// one established connection exists.
1741    pub fn num_peers(&self) -> usize {
1742        self.num_peers
1743    }
1744
1745    /// Gets counters for ongoing network connections.
1746    pub fn connection_counters(&self) -> &ConnectionCounters {
1747        &self.connection_counters
1748    }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753    use ant_libp2p_core::{
1754        multiaddr,
1755        multiaddr::multiaddr,
1756        transport,
1757        transport::{memory::MemoryTransportError, PortUse, TransportEvent},
1758        upgrade, Endpoint,
1759    };
1760    use libp2p_identity as identity;
1761    use libp2p_plaintext as plaintext;
1762    use libp2p_yamux as yamux;
1763    use quickcheck::*;
1764
1765    use super::*;
1766    use crate::test::{CallTraceBehaviour, MockBehaviour};
1767
1768    // Test execution state.
1769    // Connection => Disconnecting => Connecting.
1770    enum State {
1771        Connecting,
1772        Disconnecting,
1773    }
1774
1775    fn new_test_swarm(
1776        config: Config,
1777    ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1778        let id_keys = identity::Keypair::generate_ed25519();
1779        let local_public_key = id_keys.public();
1780        let transport = transport::MemoryTransport::default()
1781            .upgrade(upgrade::Version::V1)
1782            .authenticate(plaintext::Config::new(&id_keys))
1783            .multiplex(yamux::Config::default())
1784            .boxed();
1785        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1786
1787        Swarm::new(
1788            transport,
1789            behaviour,
1790            local_public_key.into(),
1791            config.with_idle_connection_timeout(Duration::from_secs(5)),
1792        )
1793    }
1794
1795    fn swarms_connected<TBehaviour>(
1796        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1797        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1798        num_connections: usize,
1799    ) -> bool
1800    where
1801        TBehaviour: NetworkBehaviour,
1802        THandlerOutEvent<TBehaviour>: Clone,
1803    {
1804        swarm1
1805            .behaviour()
1806            .num_connections_to_peer(*swarm2.local_peer_id())
1807            == num_connections
1808            && swarm2
1809                .behaviour()
1810                .num_connections_to_peer(*swarm1.local_peer_id())
1811                == num_connections
1812            && swarm1.is_connected(swarm2.local_peer_id())
1813            && swarm2.is_connected(swarm1.local_peer_id())
1814    }
1815
1816    fn swarms_disconnected<TBehaviour>(
1817        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1818        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1819    ) -> bool
1820    where
1821        TBehaviour: NetworkBehaviour,
1822        THandlerOutEvent<TBehaviour>: Clone,
1823    {
1824        swarm1
1825            .behaviour()
1826            .num_connections_to_peer(*swarm2.local_peer_id())
1827            == 0
1828            && swarm2
1829                .behaviour()
1830                .num_connections_to_peer(*swarm1.local_peer_id())
1831                == 0
1832            && !swarm1.is_connected(swarm2.local_peer_id())
1833            && !swarm2.is_connected(swarm1.local_peer_id())
1834    }
1835
1836    /// Establishes multiple connections between two peers,
1837    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
1838    ///
1839    /// The test expects both behaviours to be notified via calls to
1840    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1841    /// / [`FromSwarm::ConnectionClosed`]
1842    #[tokio::test]
1843    async fn test_swarm_disconnect() {
1844        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1845        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1846
1847        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1848        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1849
1850        swarm1.listen_on(addr1.clone()).unwrap();
1851        swarm2.listen_on(addr2.clone()).unwrap();
1852
1853        let swarm1_id = *swarm1.local_peer_id();
1854
1855        let mut reconnected = false;
1856        let num_connections = 10;
1857
1858        for _ in 0..num_connections {
1859            swarm1.dial(addr2.clone()).unwrap();
1860        }
1861        let mut state = State::Connecting;
1862
1863        future::poll_fn(move |cx| loop {
1864            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1865            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1866            match state {
1867                State::Connecting => {
1868                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1869                        if reconnected {
1870                            return Poll::Ready(());
1871                        }
1872                        swarm2
1873                            .disconnect_peer_id(swarm1_id)
1874                            .expect("Error disconnecting");
1875                        state = State::Disconnecting;
1876                    }
1877                }
1878                State::Disconnecting => {
1879                    if swarms_disconnected(&swarm1, &swarm2) {
1880                        if reconnected {
1881                            return Poll::Ready(());
1882                        }
1883                        reconnected = true;
1884                        for _ in 0..num_connections {
1885                            swarm2.dial(addr1.clone()).unwrap();
1886                        }
1887                        state = State::Connecting;
1888                    }
1889                }
1890            }
1891
1892            if poll1.is_pending() && poll2.is_pending() {
1893                return Poll::Pending;
1894            }
1895        })
1896        .await
1897    }
1898
1899    /// Establishes multiple connections between two peers,
1900    /// after which one peer disconnects the other
1901    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1902    ///
1903    /// The test expects both behaviours to be notified via calls to
1904    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1905    /// / [`FromSwarm::ConnectionClosed`]
1906    #[tokio::test]
1907    async fn test_behaviour_disconnect_all() {
1908        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1909        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1910
1911        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1912        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1913
1914        swarm1.listen_on(addr1.clone()).unwrap();
1915        swarm2.listen_on(addr2.clone()).unwrap();
1916
1917        let swarm1_id = *swarm1.local_peer_id();
1918
1919        let mut reconnected = false;
1920        let num_connections = 10;
1921
1922        for _ in 0..num_connections {
1923            swarm1.dial(addr2.clone()).unwrap();
1924        }
1925        let mut state = State::Connecting;
1926
1927        future::poll_fn(move |cx| loop {
1928            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1929            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1930            match state {
1931                State::Connecting => {
1932                    if swarms_connected(&swarm1, &swarm2, num_connections) {
1933                        if reconnected {
1934                            return Poll::Ready(());
1935                        }
1936                        swarm2
1937                            .behaviour
1938                            .inner()
1939                            .next_action
1940                            .replace(ToSwarm::CloseConnection {
1941                                peer_id: swarm1_id,
1942                                connection: CloseConnection::All,
1943                            });
1944                        state = State::Disconnecting;
1945                        continue;
1946                    }
1947                }
1948                State::Disconnecting => {
1949                    if swarms_disconnected(&swarm1, &swarm2) {
1950                        reconnected = true;
1951                        for _ in 0..num_connections {
1952                            swarm2.dial(addr1.clone()).unwrap();
1953                        }
1954                        state = State::Connecting;
1955                        continue;
1956                    }
1957                }
1958            }
1959
1960            if poll1.is_pending() && poll2.is_pending() {
1961                return Poll::Pending;
1962            }
1963        })
1964        .await
1965    }
1966
1967    /// Establishes multiple connections between two peers,
1968    /// after which one peer closes a single connection
1969    /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
1970    ///
1971    /// The test expects both behaviours to be notified via calls to
1972    /// [`NetworkBehaviour::on_swarm_event`] with pairs of [`FromSwarm::ConnectionEstablished`]
1973    /// / [`FromSwarm::ConnectionClosed`]
1974    #[tokio::test]
1975    async fn test_behaviour_disconnect_one() {
1976        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1977        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1978
1979        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1980        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1981
1982        swarm1.listen_on(addr1).unwrap();
1983        swarm2.listen_on(addr2.clone()).unwrap();
1984
1985        let swarm1_id = *swarm1.local_peer_id();
1986
1987        let num_connections = 10;
1988
1989        for _ in 0..num_connections {
1990            swarm1.dial(addr2.clone()).unwrap();
1991        }
1992        let mut state = State::Connecting;
1993        let mut disconnected_conn_id = None;
1994
1995        future::poll_fn(move |cx| loop {
1996            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1997            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1998            match state {
1999                State::Connecting => {
2000                    if swarms_connected(&swarm1, &swarm2, num_connections) {
2001                        disconnected_conn_id = {
2002                            let conn_id =
2003                                swarm2.behaviour.on_connection_established[num_connections / 2].1;
2004                            swarm2.behaviour.inner().next_action.replace(
2005                                ToSwarm::CloseConnection {
2006                                    peer_id: swarm1_id,
2007                                    connection: CloseConnection::One(conn_id),
2008                                },
2009                            );
2010                            Some(conn_id)
2011                        };
2012                        state = State::Disconnecting;
2013                    }
2014                }
2015                State::Disconnecting => {
2016                    for s in &[&swarm1, &swarm2] {
2017                        assert!(s
2018                            .behaviour
2019                            .on_connection_closed
2020                            .iter()
2021                            .all(|(.., remaining_conns)| *remaining_conns > 0));
2022                        assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2023                        s.behaviour.assert_connected(num_connections, 1);
2024                    }
2025                    if [&swarm1, &swarm2]
2026                        .iter()
2027                        .all(|s| s.behaviour.on_connection_closed.len() == 1)
2028                    {
2029                        let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2030                        assert_eq!(Some(conn_id), disconnected_conn_id);
2031                        return Poll::Ready(());
2032                    }
2033                }
2034            }
2035
2036            if poll1.is_pending() && poll2.is_pending() {
2037                return Poll::Pending;
2038            }
2039        })
2040        .await
2041    }
2042
2043    #[test]
2044    fn concurrent_dialing() {
2045        #[derive(Clone, Debug)]
2046        struct DialConcurrencyFactor(NonZeroU8);
2047
2048        impl Arbitrary for DialConcurrencyFactor {
2049            fn arbitrary(g: &mut Gen) -> Self {
2050                Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2051            }
2052        }
2053
2054        fn prop(concurrency_factor: DialConcurrencyFactor) {
2055            tokio::runtime::Runtime::new().unwrap().block_on(async {
2056                let mut swarm = new_test_swarm(
2057                    Config::with_tokio_executor()
2058                        .with_dial_concurrency_factor(concurrency_factor.0),
2059                );
2060
2061                // Listen on `concurrency_factor + 1` addresses.
2062                //
2063                // `+ 2` to ensure a subset of addresses is dialed by network_2.
2064                let num_listen_addrs = concurrency_factor.0.get() + 2;
2065                let mut listen_addresses = Vec::new();
2066                let mut transports = Vec::new();
2067                for _ in 0..num_listen_addrs {
2068                    let mut transport = transport::MemoryTransport::default().boxed();
2069                    transport
2070                        .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2071                        .unwrap();
2072
2073                    match transport.select_next_some().await {
2074                        TransportEvent::NewAddress { listen_addr, .. } => {
2075                            listen_addresses.push(listen_addr);
2076                        }
2077                        _ => panic!("Expected `NewListenAddr` event."),
2078                    }
2079
2080                    transports.push(transport);
2081                }
2082
2083                // Have swarm dial each listener and wait for each listener to receive the incoming
2084                // connections.
2085                swarm
2086                    .dial(
2087                        DialOpts::peer_id(PeerId::random())
2088                            .addresses(listen_addresses)
2089                            .build(),
2090                    )
2091                    .unwrap();
2092                for mut transport in transports.into_iter() {
2093                    match futures::future::select(transport.select_next_some(), swarm.next()).await
2094                    {
2095                        future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2096                        future::Either::Left(_) => {
2097                            panic!("Unexpected transport event.")
2098                        }
2099                        future::Either::Right((e, _)) => {
2100                            panic!("Expect swarm to not emit any event {e:?}")
2101                        }
2102                    }
2103                }
2104
2105                match swarm.next().await.unwrap() {
2106                    SwarmEvent::OutgoingConnectionError { .. } => {}
2107                    e => panic!("Unexpected swarm event {e:?}"),
2108                }
2109            })
2110        }
2111
2112        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2113    }
2114
2115    #[tokio::test]
2116    async fn invalid_peer_id() {
2117        // Checks whether dialing an address containing the wrong peer id raises an error
2118        // for the expected peer id instead of the obtained peer id.
2119
2120        let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2121        let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2122
2123        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2124
2125        let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2126            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2127            Poll::Pending => Poll::Pending,
2128            _ => panic!("Was expecting the listen address to be reported"),
2129        })
2130        .await;
2131
2132        let other_id = PeerId::random();
2133        let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2134
2135        swarm2.dial(other_addr.clone()).unwrap();
2136
2137        let (peer_id, error) = future::poll_fn(|cx| {
2138            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2139                swarm1.poll_next_unpin(cx)
2140            {}
2141
2142            match swarm2.poll_next_unpin(cx) {
2143                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2144                    peer_id, error, ..
2145                })) => Poll::Ready((peer_id, error)),
2146                Poll::Ready(x) => panic!("unexpected {x:?}"),
2147                Poll::Pending => Poll::Pending,
2148            }
2149        })
2150        .await;
2151        assert_eq!(peer_id.unwrap(), other_id);
2152        match error {
2153            DialError::WrongPeerId { obtained, endpoint } => {
2154                assert_eq!(obtained, *swarm1.local_peer_id());
2155                assert_eq!(
2156                    endpoint,
2157                    ConnectedPoint::Dialer {
2158                        address: other_addr,
2159                        role_override: Endpoint::Dialer,
2160                        port_use: PortUse::Reuse,
2161                    }
2162                );
2163            }
2164            x => panic!("wrong error {x:?}"),
2165        }
2166    }
2167
2168    #[tokio::test]
2169    async fn dial_self() {
2170        // Check whether dialing ourselves correctly fails.
2171        //
2172        // Dialing the same address we're listening should result in three events:
2173        //
2174        // - The incoming connection notification (before we know the incoming peer ID).
2175        // - The connection error for the dialing endpoint (once we've determined that it's our own
2176        //   ID).
2177        // - The connection error for the listening endpoint (once we've determined that it's our
2178        //   own ID).
2179        //
2180        // The last two can happen in any order.
2181
2182        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2183        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2184
2185        let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2186            Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2187            Poll::Pending => Poll::Pending,
2188            _ => panic!("Was expecting the listen address to be reported"),
2189        })
2190        .await;
2191
2192        // This is a hack to actually execute the dial
2193        // to ourselves which would otherwise be filtered.
2194        swarm.listened_addrs.clear();
2195        swarm.dial(local_address.clone()).unwrap();
2196
2197        let mut got_dial_err = false;
2198        let mut got_inc_err = false;
2199        future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2200            loop {
2201                match swarm.poll_next_unpin(cx) {
2202                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2203                        peer_id,
2204                        error: DialError::LocalPeerId { .. },
2205                        ..
2206                    })) => {
2207                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2208                        assert!(!got_dial_err);
2209                        got_dial_err = true;
2210                        if got_inc_err {
2211                            return Poll::Ready(Ok(()));
2212                        }
2213                    }
2214                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2215                        local_addr, ..
2216                    })) => {
2217                        assert!(!got_inc_err);
2218                        assert_eq!(local_addr, local_address);
2219                        got_inc_err = true;
2220                        if got_dial_err {
2221                            return Poll::Ready(Ok(()));
2222                        }
2223                    }
2224                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2225                        assert_eq!(local_addr, local_address);
2226                    }
2227                    Poll::Ready(ev) => {
2228                        panic!("Unexpected event: {ev:?}")
2229                    }
2230                    Poll::Pending => break Poll::Pending,
2231                }
2232            }
2233        })
2234        .await
2235        .unwrap();
2236    }
2237
2238    #[tokio::test]
2239    async fn dial_self_by_id() {
2240        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
2241        // place.
2242        let swarm = new_test_swarm(Config::with_tokio_executor());
2243        let peer_id = *swarm.local_peer_id();
2244        assert!(!swarm.is_connected(&peer_id));
2245    }
2246
2247    #[tokio::test]
2248    async fn multiple_addresses_err() {
2249        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.
2250
2251        let target = PeerId::random();
2252
2253        let mut swarm = new_test_swarm(Config::with_tokio_executor());
2254
2255        let addresses = HashSet::from([
2256            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2258            multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2259            multiaddr![Udp(rand::random::<u16>())],
2260            multiaddr![Udp(rand::random::<u16>())],
2261            multiaddr![Udp(rand::random::<u16>())],
2262            multiaddr![Udp(rand::random::<u16>())],
2263            multiaddr![Udp(rand::random::<u16>())],
2264        ]);
2265
2266        swarm
2267            .dial(
2268                DialOpts::peer_id(target)
2269                    .addresses(addresses.iter().cloned().collect())
2270                    .build(),
2271            )
2272            .unwrap();
2273
2274        match swarm.next().await.unwrap() {
2275            SwarmEvent::OutgoingConnectionError {
2276                peer_id,
2277                // multiaddr,
2278                error: DialError::Transport(errors),
2279                ..
2280            } => {
2281                assert_eq!(target, peer_id.unwrap());
2282
2283                let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2284                let expected_addresses = addresses
2285                    .into_iter()
2286                    .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2287                    .collect::<Vec<_>>();
2288
2289                assert_eq!(expected_addresses, failed_addresses);
2290            }
2291            e => panic!("Unexpected event: {e:?}"),
2292        }
2293    }
2294
2295    #[tokio::test]
2296    async fn aborting_pending_connection_surfaces_error() {
2297        let _ = tracing_subscriber::fmt()
2298            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2299            .try_init();
2300
2301        let mut dialer = new_test_swarm(Config::with_tokio_executor());
2302        let mut listener = new_test_swarm(Config::with_tokio_executor());
2303
2304        let listener_peer_id = *listener.local_peer_id();
2305        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2306        let listener_address = match listener.next().await.unwrap() {
2307            SwarmEvent::NewListenAddr { address, .. } => address,
2308            e => panic!("Unexpected network event: {e:?}"),
2309        };
2310
2311        dialer
2312            .dial(
2313                DialOpts::peer_id(listener_peer_id)
2314                    .addresses(vec![listener_address])
2315                    .build(),
2316            )
2317            .unwrap();
2318
2319        dialer
2320            .disconnect_peer_id(listener_peer_id)
2321            .expect_err("Expect peer to not yet be connected.");
2322
2323        match dialer.next().await.unwrap() {
2324            SwarmEvent::OutgoingConnectionError {
2325                error: DialError::Aborted,
2326                ..
2327            } => {}
2328            e => panic!("Unexpected swarm event {e:?}."),
2329        }
2330    }
2331
2332    #[test]
2333    fn dial_error_prints_sources() {
2334        // This constitutes a fairly typical error for chained transports.
2335        let error = DialError::Transport(vec![(
2336            "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2337            TransportError::Other(io::Error::new(
2338                io::ErrorKind::Other,
2339                MemoryTransportError::Unreachable,
2340            )),
2341        )]);
2342
2343        let string = format!("{error}");
2344
2345        // Unfortunately, we have some "empty" errors
2346        // that lead to multiple colons without text but that is the best we can do.
2347        assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2348    }
2349}