1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
55
56mod connection;
57mod executor;
58mod stream;
59mod stream_protocol;
60#[cfg(test)]
61mod test;
62mod upgrade;
63
64pub mod behaviour;
65pub mod dial_opts;
66pub mod dummy;
67pub mod handler;
68mod listen_opts;
69mod translation;
70
71#[doc(hidden)]
73pub mod derive_prelude {
74 pub use either::Either;
75 pub use futures::prelude as futures;
76 pub use ant_libp2p_core::{
77 transport::{ListenerId, PortUse},
78 ConnectedPoint, Endpoint, Multiaddr,
79 };
80 pub use libp2p_identity::PeerId;
81
82 pub use crate::{
83 behaviour::{
84 AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredListenAddr,
85 ExternalAddrConfirmed, ExternalAddrExpired, FromSwarm, ListenFailure, ListenerClosed,
86 ListenerError, NewExternalAddrCandidate, NewExternalAddrOfPeer, NewListenAddr,
87 NewListener,
88 },
89 connection::ConnectionId,
90 ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, DialError, NetworkBehaviour,
91 THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
92 };
93}
94
95use std::{
96 collections::{HashMap, HashSet, VecDeque},
97 error, fmt, io,
98 num::{NonZeroU32, NonZeroU8, NonZeroUsize},
99 pin::Pin,
100 task::{Context, Poll},
101 time::Duration,
102};
103
104pub use behaviour::{
105 AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredListenAddr,
106 ExternalAddrExpired, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
107 ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddrCandidate,
108 NewExternalAddrOfPeer, NewListenAddr, NotifyHandler, PeerAddresses, ToSwarm,
109};
110pub use connection::{pool::ConnectionCounters, ConnectionError, ConnectionId, SupportedProtocols};
111use connection::{
112 pool::{EstablishedConnection, Pool, PoolConfig, PoolEvent},
113 IncomingInfo, PendingConnectionError, PendingInboundConnectionError,
114 PendingOutboundConnectionError,
115};
116use dial_opts::{DialOpts, PeerCondition};
117pub use executor::Executor;
118use futures::{prelude::*, stream::FusedStream};
119pub use handler::{
120 ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler,
121 OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol,
122};
123use ant_libp2p_core::{
124 connection::ConnectedPoint,
125 muxing::StreamMuxerBox,
126 transport::{self, ListenerId, TransportError, TransportEvent},
127 Multiaddr, Transport,
128};
129use libp2p_identity::PeerId;
130#[cfg(feature = "macros")]
131pub use libp2p_swarm_derive::NetworkBehaviour;
132pub use listen_opts::ListenOpts;
133use smallvec::SmallVec;
134pub use stream::Stream;
135pub use stream_protocol::{InvalidProtocol, StreamProtocol};
136use tracing::Instrument;
137#[doc(hidden)]
138pub use translation::_address_translation;
139
140use crate::{behaviour::ExternalAddrConfirmed, handler::UpgradeInfoSend};
141
142type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::ToSwarm;
144
145pub type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
148
149pub type THandlerInEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::FromBehaviour;
152
153pub type THandlerOutEvent<TBehaviour> = <THandler<TBehaviour> as ConnectionHandler>::ToBehaviour;
155
156#[derive(Debug)]
158#[non_exhaustive]
159pub enum SwarmEvent<TBehaviourOutEvent> {
160 Behaviour(TBehaviourOutEvent),
162 ConnectionEstablished {
164 peer_id: PeerId,
166 connection_id: ConnectionId,
168 endpoint: ConnectedPoint,
170 num_established: NonZeroU32,
173 concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
177 established_in: std::time::Duration,
179 },
180 ConnectionClosed {
183 peer_id: PeerId,
185 connection_id: ConnectionId,
187 endpoint: ConnectedPoint,
189 num_established: u32,
191 cause: Option<ConnectionError>,
194 },
195 IncomingConnection {
201 connection_id: ConnectionId,
203 local_addr: Multiaddr,
207 send_back_addr: Multiaddr,
209 },
210 IncomingConnectionError {
215 connection_id: ConnectionId,
217 local_addr: Multiaddr,
221 send_back_addr: Multiaddr,
223 error: ListenError,
225 },
226 OutgoingConnectionError {
228 connection_id: ConnectionId,
230 peer_id: Option<PeerId>,
232 error: DialError,
234 },
235 NewListenAddr {
237 listener_id: ListenerId,
239 address: Multiaddr,
241 },
242 ExpiredListenAddr {
244 listener_id: ListenerId,
246 address: Multiaddr,
248 },
249 ListenerClosed {
251 listener_id: ListenerId,
253 addresses: Vec<Multiaddr>,
257 reason: Result<(), io::Error>,
260 },
261 ListenerError {
263 listener_id: ListenerId,
265 error: io::Error,
267 },
268 Dialing {
276 peer_id: Option<PeerId>,
278
279 connection_id: ConnectionId,
281 },
282 NewExternalAddrCandidate { address: Multiaddr },
284 ExternalAddrConfirmed { address: Multiaddr },
286 ExternalAddrExpired { address: Multiaddr },
288 NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
290}
291
292impl<TBehaviourOutEvent> SwarmEvent<TBehaviourOutEvent> {
293 #[allow(clippy::result_large_err)]
296 pub fn try_into_behaviour_event(self) -> Result<TBehaviourOutEvent, Self> {
297 match self {
298 SwarmEvent::Behaviour(inner) => Ok(inner),
299 other => Err(other),
300 }
301 }
302}
303
304pub struct Swarm<TBehaviour>
309where
310 TBehaviour: NetworkBehaviour,
311{
312 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
314
315 pool: Pool<THandler<TBehaviour>>,
317
318 local_peer_id: PeerId,
320
321 behaviour: TBehaviour,
324
325 supported_protocols: SmallVec<[Vec<u8>; 16]>,
327
328 confirmed_external_addr: HashSet<Multiaddr>,
329
330 listened_addrs: HashMap<ListenerId, SmallVec<[Multiaddr; 1]>>,
332
333 pending_handler_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
337
338 pending_swarm_events: VecDeque<SwarmEvent<TBehaviour::ToSwarm>>,
339}
340
341impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
342
343impl<TBehaviour> Swarm<TBehaviour>
344where
345 TBehaviour: NetworkBehaviour,
346{
347 pub fn new(
350 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
351 behaviour: TBehaviour,
352 local_peer_id: PeerId,
353 config: Config,
354 ) -> Self {
355 tracing::info!(%local_peer_id);
356
357 Swarm {
358 local_peer_id,
359 transport,
360 pool: Pool::new(local_peer_id, config.pool_config),
361 behaviour,
362 supported_protocols: Default::default(),
363 confirmed_external_addr: Default::default(),
364 listened_addrs: HashMap::new(),
365 pending_handler_event: None,
366 pending_swarm_events: VecDeque::default(),
367 }
368 }
369
370 pub fn network_info(&self) -> NetworkInfo {
372 let num_peers = self.pool.num_peers();
373 let connection_counters = self.pool.counters().clone();
374 NetworkInfo {
375 num_peers,
376 connection_counters,
377 }
378 }
379
380 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
386 let opts = ListenOpts::new(addr);
387 let id = opts.listener_id();
388 self.add_listener(opts)?;
389 Ok(id)
390 }
391
392 pub fn remove_listener(&mut self, listener_id: ListenerId) -> bool {
397 self.transport.remove_listener(listener_id)
398 }
399
400 pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
428 let dial_opts = opts.into();
429
430 let peer_id = dial_opts.get_peer_id();
431 let condition = dial_opts.peer_condition();
432 let connection_id = dial_opts.connection_id();
433
434 let should_dial = match (condition, peer_id) {
435 (_, None) => true,
436 (PeerCondition::Always, _) => true,
437 (PeerCondition::Disconnected, Some(peer_id)) => !self.pool.is_connected(peer_id),
438 (PeerCondition::NotDialing, Some(peer_id)) => !self.pool.is_dialing(peer_id),
439 (PeerCondition::DisconnectedAndNotDialing, Some(peer_id)) => {
440 !self.pool.is_dialing(peer_id) && !self.pool.is_connected(peer_id)
441 }
442 };
443
444 if !should_dial {
445 let e = DialError::DialPeerConditionFalse(condition);
446
447 self.behaviour
448 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
449 peer_id,
450 error: &e,
451 connection_id,
452 }));
453
454 return Err(e);
455 }
456
457 let addresses = {
458 let mut addresses_from_opts = dial_opts.get_addresses();
459
460 match self.behaviour.handle_pending_outbound_connection(
461 connection_id,
462 peer_id,
463 addresses_from_opts.as_slice(),
464 dial_opts.role_override(),
465 ) {
466 Ok(addresses) => {
467 if dial_opts.extend_addresses_through_behaviour() {
468 addresses_from_opts.extend(addresses)
469 } else {
470 let num_addresses = addresses.len();
471
472 if num_addresses > 0 {
473 tracing::debug!(
474 connection=%connection_id,
475 discarded_addresses_count=%num_addresses,
476 "discarding addresses from `NetworkBehaviour` because `DialOpts::extend_addresses_through_behaviour is `false` for connection"
477 )
478 }
479 }
480 }
481 Err(cause) => {
482 let error = DialError::Denied { cause };
483
484 self.behaviour
485 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
486 peer_id,
487 error: &error,
488 connection_id,
489 }));
490
491 return Err(error);
492 }
493 }
494
495 let mut unique_addresses = HashSet::new();
496 addresses_from_opts.retain(|addr| {
497 !self.listened_addrs.values().flatten().any(|a| a == addr)
498 && unique_addresses.insert(addr.clone())
499 });
500
501 if addresses_from_opts.is_empty() {
502 let error = DialError::NoAddresses;
503 self.behaviour
504 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
505 peer_id,
506 error: &error,
507 connection_id,
508 }));
509 return Err(error);
510 };
511
512 addresses_from_opts
513 };
514
515 let dials = addresses
516 .into_iter()
517 .map(|a| match peer_id.map_or(Ok(a.clone()), |p| a.with_p2p(p)) {
518 Ok(address) => {
519 let dial = self.transport.dial(
520 address.clone(),
521 transport::DialOpts {
522 role: dial_opts.role_override(),
523 port_use: dial_opts.port_use(),
524 },
525 );
526 let span = tracing::debug_span!(parent: tracing::Span::none(), "Transport::dial", %address);
527 span.follows_from(tracing::Span::current());
528 match dial {
529 Ok(fut) => fut
530 .map(|r| (address, r.map_err(TransportError::Other)))
531 .instrument(span)
532 .boxed(),
533 Err(err) => futures::future::ready((address, Err(err))).boxed(),
534 }
535 }
536 Err(address) => futures::future::ready((
537 address.clone(),
538 Err(TransportError::MultiaddrNotSupported(address)),
539 ))
540 .boxed(),
541 })
542 .collect();
543
544 self.pool.add_outgoing(
545 dials,
546 peer_id,
547 dial_opts.role_override(),
548 dial_opts.port_use(),
549 dial_opts.dial_concurrency_override(),
550 connection_id,
551 );
552
553 Ok(())
554 }
555
556 pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
558 self.listened_addrs.values().flatten()
559 }
560
561 pub fn local_peer_id(&self) -> &PeerId {
563 &self.local_peer_id
564 }
565
566 pub fn external_addresses(&self) -> impl Iterator<Item = &Multiaddr> {
568 self.confirmed_external_addr.iter()
569 }
570
571 fn add_listener(&mut self, opts: ListenOpts) -> Result<(), TransportError<io::Error>> {
572 let addr = opts.address();
573 let listener_id = opts.listener_id();
574
575 if let Err(e) = self.transport.listen_on(listener_id, addr.clone()) {
576 self.behaviour
577 .on_swarm_event(FromSwarm::ListenerError(behaviour::ListenerError {
578 listener_id,
579 err: &e,
580 }));
581
582 return Err(e);
583 }
584
585 self.behaviour
586 .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
587 listener_id,
588 }));
589
590 Ok(())
591 }
592
593 pub fn add_external_address(&mut self, a: Multiaddr) {
599 self.behaviour
600 .on_swarm_event(FromSwarm::ExternalAddrConfirmed(ExternalAddrConfirmed {
601 addr: &a,
602 }));
603 self.confirmed_external_addr.insert(a);
604 }
605
606 pub fn remove_external_address(&mut self, addr: &Multiaddr) {
611 self.behaviour
612 .on_swarm_event(FromSwarm::ExternalAddrExpired(ExternalAddrExpired { addr }));
613 self.confirmed_external_addr.remove(addr);
614 }
615
616 pub fn add_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
621 self.behaviour
622 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
623 peer_id,
624 addr: &addr,
625 }))
626 }
627
628 #[allow(clippy::result_unit_err)]
636 pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
637 let was_connected = self.pool.is_connected(peer_id);
638 self.pool.disconnect(peer_id);
639
640 if was_connected {
641 Ok(())
642 } else {
643 Err(())
644 }
645 }
646
647 pub fn close_connection(&mut self, connection_id: ConnectionId) -> bool {
658 if let Some(established) = self.pool.get_established(connection_id) {
659 established.start_close();
660 return true;
661 }
662
663 false
664 }
665
666 pub fn is_connected(&self, peer_id: &PeerId) -> bool {
668 self.pool.is_connected(*peer_id)
669 }
670
671 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
673 self.pool.iter_connected()
674 }
675
676 pub fn behaviour(&self) -> &TBehaviour {
678 &self.behaviour
679 }
680
681 pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
683 &mut self.behaviour
684 }
685
686 fn handle_pool_event(&mut self, event: PoolEvent<THandlerOutEvent<TBehaviour>>) {
687 match event {
688 PoolEvent::ConnectionEstablished {
689 peer_id,
690 id,
691 endpoint,
692 connection,
693 concurrent_dial_errors,
694 established_in,
695 } => {
696 let handler = match endpoint.clone() {
697 ConnectedPoint::Dialer {
698 address,
699 role_override,
700 port_use,
701 } => {
702 match self.behaviour.handle_established_outbound_connection(
703 id,
704 peer_id,
705 &address,
706 role_override,
707 port_use,
708 ) {
709 Ok(handler) => handler,
710 Err(cause) => {
711 let dial_error = DialError::Denied { cause };
712 self.behaviour.on_swarm_event(FromSwarm::DialFailure(
713 DialFailure {
714 connection_id: id,
715 error: &dial_error,
716 peer_id: Some(peer_id),
717 },
718 ));
719
720 self.pending_swarm_events.push_back(
721 SwarmEvent::OutgoingConnectionError {
722 peer_id: Some(peer_id),
723 connection_id: id,
724 error: dial_error,
725 },
726 );
727 return;
728 }
729 }
730 }
731 ConnectedPoint::Listener {
732 local_addr,
733 send_back_addr,
734 } => {
735 match self.behaviour.handle_established_inbound_connection(
736 id,
737 peer_id,
738 &local_addr,
739 &send_back_addr,
740 ) {
741 Ok(handler) => handler,
742 Err(cause) => {
743 let listen_error = ListenError::Denied { cause };
744 self.behaviour.on_swarm_event(FromSwarm::ListenFailure(
745 ListenFailure {
746 local_addr: &local_addr,
747 send_back_addr: &send_back_addr,
748 error: &listen_error,
749 connection_id: id,
750 peer_id: Some(peer_id),
751 },
752 ));
753
754 self.pending_swarm_events.push_back(
755 SwarmEvent::IncomingConnectionError {
756 connection_id: id,
757 send_back_addr,
758 local_addr,
759 error: listen_error,
760 },
761 );
762 return;
763 }
764 }
765 }
766 };
767
768 let supported_protocols = handler
769 .listen_protocol()
770 .upgrade()
771 .protocol_info()
772 .map(|p| p.as_ref().as_bytes().to_vec())
773 .collect();
774 let other_established_connection_ids = self
775 .pool
776 .iter_established_connections_of_peer(&peer_id)
777 .collect::<Vec<_>>();
778 let num_established = NonZeroU32::new(
779 u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
780 )
781 .expect("n + 1 is always non-zero; qed");
782
783 self.pool
784 .spawn_connection(id, peer_id, &endpoint, connection, handler);
785
786 tracing::debug!(
787 peer=%peer_id,
788 ?endpoint,
789 total_peers=%num_established,
790 "Connection established"
791 );
792 let failed_addresses = concurrent_dial_errors
793 .as_ref()
794 .map(|es| {
795 es.iter()
796 .map(|(a, _)| a)
797 .cloned()
798 .collect::<Vec<Multiaddr>>()
799 })
800 .unwrap_or_default();
801 self.behaviour
802 .on_swarm_event(FromSwarm::ConnectionEstablished(
803 behaviour::ConnectionEstablished {
804 peer_id,
805 connection_id: id,
806 endpoint: &endpoint,
807 failed_addresses: &failed_addresses,
808 other_established: other_established_connection_ids.len(),
809 },
810 ));
811 self.supported_protocols = supported_protocols;
812 self.pending_swarm_events
813 .push_back(SwarmEvent::ConnectionEstablished {
814 peer_id,
815 connection_id: id,
816 num_established,
817 endpoint,
818 concurrent_dial_errors,
819 established_in,
820 });
821 }
822 PoolEvent::PendingOutboundConnectionError {
823 id: connection_id,
824 error,
825 peer,
826 } => {
827 let error = error.into();
828
829 self.behaviour
830 .on_swarm_event(FromSwarm::DialFailure(DialFailure {
831 peer_id: peer,
832 error: &error,
833 connection_id,
834 }));
835
836 if let Some(peer) = peer {
837 tracing::debug!(%peer, "Connection attempt to peer failed with {:?}.", error,);
838 } else {
839 tracing::debug!("Connection attempt to unknown peer failed with {:?}", error);
840 }
841
842 self.pending_swarm_events
843 .push_back(SwarmEvent::OutgoingConnectionError {
844 peer_id: peer,
845 connection_id,
846 error,
847 });
848 }
849 PoolEvent::PendingInboundConnectionError {
850 id,
851 send_back_addr,
852 local_addr,
853 error,
854 } => {
855 let error = error.into();
856
857 tracing::debug!("Incoming connection failed: {:?}", error);
858 self.behaviour
859 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
860 local_addr: &local_addr,
861 send_back_addr: &send_back_addr,
862 error: &error,
863 connection_id: id,
864 peer_id: None,
865 }));
866 self.pending_swarm_events
867 .push_back(SwarmEvent::IncomingConnectionError {
868 connection_id: id,
869 local_addr,
870 send_back_addr,
871 error,
872 });
873 }
874 PoolEvent::ConnectionClosed {
875 id,
876 connected,
877 error,
878 remaining_established_connection_ids,
879 ..
880 } => {
881 if let Some(error) = error.as_ref() {
882 tracing::debug!(
883 total_peers=%remaining_established_connection_ids.len(),
884 "Connection closed with error {:?}: {:?}",
885 error,
886 connected,
887 );
888 } else {
889 tracing::debug!(
890 total_peers=%remaining_established_connection_ids.len(),
891 "Connection closed: {:?}",
892 connected
893 );
894 }
895 let peer_id = connected.peer_id;
896 let endpoint = connected.endpoint;
897 let num_established =
898 u32::try_from(remaining_established_connection_ids.len()).unwrap();
899
900 self.behaviour
901 .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
902 peer_id,
903 connection_id: id,
904 endpoint: &endpoint,
905 cause: error.as_ref(),
906 remaining_established: num_established as usize,
907 }));
908 self.pending_swarm_events
909 .push_back(SwarmEvent::ConnectionClosed {
910 peer_id,
911 connection_id: id,
912 endpoint,
913 cause: error,
914 num_established,
915 });
916 }
917 PoolEvent::ConnectionEvent { peer_id, id, event } => {
918 self.behaviour
919 .on_connection_handler_event(peer_id, id, event);
920 }
921 PoolEvent::AddressChange {
922 peer_id,
923 id,
924 new_endpoint,
925 old_endpoint,
926 } => {
927 self.behaviour
928 .on_swarm_event(FromSwarm::AddressChange(AddressChange {
929 peer_id,
930 connection_id: id,
931 old: &old_endpoint,
932 new: &new_endpoint,
933 }));
934 }
935 }
936 }
937
938 fn handle_transport_event(
939 &mut self,
940 event: TransportEvent<
941 <transport::Boxed<(PeerId, StreamMuxerBox)> as Transport>::ListenerUpgrade,
942 io::Error,
943 >,
944 ) {
945 match event {
946 TransportEvent::Incoming {
947 listener_id: _,
948 upgrade,
949 local_addr,
950 send_back_addr,
951 } => {
952 let connection_id = ConnectionId::next();
953
954 match self.behaviour.handle_pending_inbound_connection(
955 connection_id,
956 &local_addr,
957 &send_back_addr,
958 ) {
959 Ok(()) => {}
960 Err(cause) => {
961 let listen_error = ListenError::Denied { cause };
962
963 self.behaviour
964 .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
965 local_addr: &local_addr,
966 send_back_addr: &send_back_addr,
967 error: &listen_error,
968 connection_id,
969 peer_id: None,
970 }));
971
972 self.pending_swarm_events
973 .push_back(SwarmEvent::IncomingConnectionError {
974 connection_id,
975 local_addr,
976 send_back_addr,
977 error: listen_error,
978 });
979 return;
980 }
981 }
982
983 self.pool.add_incoming(
984 upgrade,
985 IncomingInfo {
986 local_addr: &local_addr,
987 send_back_addr: &send_back_addr,
988 },
989 connection_id,
990 );
991
992 self.pending_swarm_events
993 .push_back(SwarmEvent::IncomingConnection {
994 connection_id,
995 local_addr,
996 send_back_addr,
997 })
998 }
999 TransportEvent::NewAddress {
1000 listener_id,
1001 listen_addr,
1002 } => {
1003 tracing::debug!(
1004 listener=?listener_id,
1005 address=%listen_addr,
1006 "New listener address"
1007 );
1008 let addrs = self.listened_addrs.entry(listener_id).or_default();
1009 if !addrs.contains(&listen_addr) {
1010 addrs.push(listen_addr.clone())
1011 }
1012 self.behaviour
1013 .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
1014 listener_id,
1015 addr: &listen_addr,
1016 }));
1017 self.pending_swarm_events
1018 .push_back(SwarmEvent::NewListenAddr {
1019 listener_id,
1020 address: listen_addr,
1021 })
1022 }
1023 TransportEvent::AddressExpired {
1024 listener_id,
1025 listen_addr,
1026 } => {
1027 tracing::debug!(
1028 listener=?listener_id,
1029 address=%listen_addr,
1030 "Expired listener address"
1031 );
1032 if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
1033 addrs.retain(|a| a != &listen_addr);
1034 }
1035 self.behaviour
1036 .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
1037 listener_id,
1038 addr: &listen_addr,
1039 }));
1040 self.pending_swarm_events
1041 .push_back(SwarmEvent::ExpiredListenAddr {
1042 listener_id,
1043 address: listen_addr,
1044 })
1045 }
1046 TransportEvent::ListenerClosed {
1047 listener_id,
1048 reason,
1049 } => {
1050 tracing::debug!(
1051 listener=?listener_id,
1052 ?reason,
1053 "Listener closed"
1054 );
1055 let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
1056 for addr in addrs.iter() {
1057 self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
1058 ExpiredListenAddr { listener_id, addr },
1059 ));
1060 }
1061 self.behaviour
1062 .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
1063 listener_id,
1064 reason: reason.as_ref().copied(),
1065 }));
1066 self.pending_swarm_events
1067 .push_back(SwarmEvent::ListenerClosed {
1068 listener_id,
1069 addresses: addrs.to_vec(),
1070 reason,
1071 })
1072 }
1073 TransportEvent::ListenerError { listener_id, error } => {
1074 self.behaviour
1075 .on_swarm_event(FromSwarm::ListenerError(ListenerError {
1076 listener_id,
1077 err: &error,
1078 }));
1079 self.pending_swarm_events
1080 .push_back(SwarmEvent::ListenerError { listener_id, error })
1081 }
1082 }
1083 }
1084
1085 fn handle_behaviour_event(
1086 &mut self,
1087 event: ToSwarm<TBehaviour::ToSwarm, THandlerInEvent<TBehaviour>>,
1088 ) {
1089 match event {
1090 ToSwarm::GenerateEvent(event) => {
1091 self.pending_swarm_events
1092 .push_back(SwarmEvent::Behaviour(event));
1093 }
1094 ToSwarm::Dial { opts } => {
1095 let peer_id = opts.get_peer_id();
1096 let connection_id = opts.connection_id();
1097 if let Ok(()) = self.dial(opts) {
1098 self.pending_swarm_events.push_back(SwarmEvent::Dialing {
1099 peer_id,
1100 connection_id,
1101 });
1102 }
1103 }
1104 ToSwarm::ListenOn { opts } => {
1105 let _ = self.add_listener(opts);
1107 }
1108 ToSwarm::RemoveListener { id } => {
1109 self.remove_listener(id);
1110 }
1111 ToSwarm::NotifyHandler {
1112 peer_id,
1113 handler,
1114 event,
1115 } => {
1116 assert!(self.pending_handler_event.is_none());
1117 let handler = match handler {
1118 NotifyHandler::One(connection) => PendingNotifyHandler::One(connection),
1119 NotifyHandler::Any => {
1120 let ids = self
1121 .pool
1122 .iter_established_connections_of_peer(&peer_id)
1123 .collect();
1124 PendingNotifyHandler::Any(ids)
1125 }
1126 };
1127
1128 self.pending_handler_event = Some((peer_id, handler, event));
1129 }
1130 ToSwarm::NewExternalAddrCandidate(addr) => {
1131 if !self.confirmed_external_addr.contains(&addr) {
1132 self.behaviour
1133 .on_swarm_event(FromSwarm::NewExternalAddrCandidate(
1134 NewExternalAddrCandidate { addr: &addr },
1135 ));
1136 self.pending_swarm_events
1137 .push_back(SwarmEvent::NewExternalAddrCandidate { address: addr });
1138 }
1139 }
1140 ToSwarm::ExternalAddrConfirmed(addr) => {
1141 self.add_external_address(addr.clone());
1142 self.pending_swarm_events
1143 .push_back(SwarmEvent::ExternalAddrConfirmed { address: addr });
1144 }
1145 ToSwarm::ExternalAddrExpired(addr) => {
1146 self.remove_external_address(&addr);
1147 self.pending_swarm_events
1148 .push_back(SwarmEvent::ExternalAddrExpired { address: addr });
1149 }
1150 ToSwarm::CloseConnection {
1151 peer_id,
1152 connection,
1153 } => match connection {
1154 CloseConnection::One(connection_id) => {
1155 if let Some(conn) = self.pool.get_established(connection_id) {
1156 conn.start_close();
1157 }
1158 }
1159 CloseConnection::All => {
1160 self.pool.disconnect(peer_id);
1161 }
1162 },
1163 ToSwarm::NewExternalAddrOfPeer { peer_id, address } => {
1164 self.behaviour
1165 .on_swarm_event(FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer {
1166 peer_id,
1167 addr: &address,
1168 }));
1169 self.pending_swarm_events
1170 .push_back(SwarmEvent::NewExternalAddrOfPeer { peer_id, address });
1171 }
1172 }
1173 }
1174
1175 #[tracing::instrument(level = "debug", name = "Swarm::poll", skip(self, cx))]
1179 fn poll_next_event(
1180 mut self: Pin<&mut Self>,
1181 cx: &mut Context<'_>,
1182 ) -> Poll<SwarmEvent<TBehaviour::ToSwarm>> {
1183 let this = &mut *self;
1186
1187 loop {
1198 if let Some(swarm_event) = this.pending_swarm_events.pop_front() {
1199 return Poll::Ready(swarm_event);
1200 }
1201
1202 match this.pending_handler_event.take() {
1203 Some((peer_id, handler, event)) => match handler {
1206 PendingNotifyHandler::One(conn_id) => {
1207 match this.pool.get_established(conn_id) {
1208 Some(conn) => match notify_one(conn, event, cx) {
1209 None => continue,
1210 Some(event) => {
1211 this.pending_handler_event = Some((peer_id, handler, event));
1212 }
1213 },
1214 None => continue,
1215 }
1216 }
1217 PendingNotifyHandler::Any(ids) => {
1218 match notify_any::<_, TBehaviour>(ids, &mut this.pool, event, cx) {
1219 None => continue,
1220 Some((event, ids)) => {
1221 let handler = PendingNotifyHandler::Any(ids);
1222 this.pending_handler_event = Some((peer_id, handler, event));
1223 }
1224 }
1225 }
1226 },
1227 None => match this.behaviour.poll(cx) {
1229 Poll::Pending => {}
1230 Poll::Ready(behaviour_event) => {
1231 this.handle_behaviour_event(behaviour_event);
1232
1233 continue;
1234 }
1235 },
1236 }
1237
1238 match this.pool.poll(cx) {
1240 Poll::Pending => {}
1241 Poll::Ready(pool_event) => {
1242 this.handle_pool_event(pool_event);
1243 continue;
1244 }
1245 }
1246
1247 match Pin::new(&mut this.transport).poll(cx) {
1249 Poll::Pending => {}
1250 Poll::Ready(transport_event) => {
1251 this.handle_transport_event(transport_event);
1252 continue;
1253 }
1254 }
1255
1256 return Poll::Pending;
1257 }
1258 }
1259}
1260
1261enum PendingNotifyHandler {
1268 One(ConnectionId),
1269 Any(SmallVec<[ConnectionId; 10]>),
1270}
1271
1272fn notify_one<THandlerInEvent>(
1281 conn: &mut EstablishedConnection<THandlerInEvent>,
1282 event: THandlerInEvent,
1283 cx: &mut Context<'_>,
1284) -> Option<THandlerInEvent> {
1285 match conn.poll_ready_notify_handler(cx) {
1286 Poll::Pending => Some(event),
1287 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
1289 let _ = conn.notify_handler(event);
1291 None
1292 }
1293 }
1294}
1295
1296fn notify_any<THandler, TBehaviour>(
1307 ids: SmallVec<[ConnectionId; 10]>,
1308 pool: &mut Pool<THandler>,
1309 event: THandlerInEvent<TBehaviour>,
1310 cx: &mut Context<'_>,
1311) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
1312where
1313 TBehaviour: NetworkBehaviour,
1314 THandler: ConnectionHandler<
1315 FromBehaviour = THandlerInEvent<TBehaviour>,
1316 ToBehaviour = THandlerOutEvent<TBehaviour>,
1317 >,
1318{
1319 let mut pending = SmallVec::new();
1320 let mut event = Some(event); for id in ids.into_iter() {
1322 if let Some(conn) = pool.get_established(id) {
1323 match conn.poll_ready_notify_handler(cx) {
1324 Poll::Pending => pending.push(id),
1325 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
1327 let e = event.take().expect("by (1),(2)");
1328 if let Err(e) = conn.notify_handler(e) {
1329 event = Some(e) } else {
1331 break;
1332 }
1333 }
1334 }
1335 }
1336 }
1337
1338 event.and_then(|e| {
1339 if !pending.is_empty() {
1340 Some((e, pending))
1341 } else {
1342 None
1343 }
1344 })
1345}
1346
1347impl<TBehaviour> futures::Stream for Swarm<TBehaviour>
1355where
1356 TBehaviour: NetworkBehaviour,
1357{
1358 type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>>;
1359
1360 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1361 self.as_mut().poll_next_event(cx).map(Some)
1362 }
1363}
1364
1365impl<TBehaviour> FusedStream for Swarm<TBehaviour>
1367where
1368 TBehaviour: NetworkBehaviour,
1369{
1370 fn is_terminated(&self) -> bool {
1371 false
1372 }
1373}
1374
1375pub struct Config {
1376 pool_config: PoolConfig,
1377}
1378
1379impl Config {
1380 pub fn with_executor(executor: impl Executor + Send + 'static) -> Self {
1383 Self {
1384 pool_config: PoolConfig::new(Some(Box::new(executor))),
1385 }
1386 }
1387
1388 #[doc(hidden)]
1389 pub fn without_executor() -> Self {
1391 Self {
1392 pool_config: PoolConfig::new(None),
1393 }
1394 }
1395
1396 #[cfg(feature = "wasm-bindgen")]
1406 pub fn with_wasm_executor() -> Self {
1407 Self::with_executor(crate::executor::WasmBindgenExecutor)
1408 }
1409
1410 #[cfg(all(
1412 feature = "tokio",
1413 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1414 ))]
1415 pub fn with_tokio_executor() -> Self {
1416 Self::with_executor(crate::executor::TokioExecutor)
1417 }
1418
1419 #[cfg(all(
1421 feature = "async-std",
1422 not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown"))
1423 ))]
1424 pub fn with_async_std_executor() -> Self {
1425 Self::with_executor(crate::executor::AsyncStdExecutor)
1426 }
1427
1428 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
1438 self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
1439 self
1440 }
1441
1442 pub fn with_per_connection_event_buffer_size(mut self, n: usize) -> Self {
1454 self.pool_config = self.pool_config.with_per_connection_event_buffer_size(n);
1455 self
1456 }
1457
1458 pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
1460 self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
1461 self
1462 }
1463
1464 pub fn with_substream_upgrade_protocol_override(
1475 mut self,
1476 v: ant_libp2p_core::upgrade::Version,
1477 ) -> Self {
1478 self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
1479 self
1480 }
1481
1482 pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self {
1492 self.pool_config = self.pool_config.with_max_negotiating_inbound_streams(v);
1493 self
1494 }
1495
1496 pub fn with_idle_connection_timeout(mut self, timeout: Duration) -> Self {
1500 self.pool_config.idle_connection_timeout = timeout;
1501 self
1502 }
1503}
1504
1505#[derive(Debug)]
1507pub enum DialError {
1508 LocalPeerId { endpoint: ConnectedPoint },
1510 NoAddresses,
1513 DialPeerConditionFalse(dial_opts::PeerCondition),
1516 Aborted,
1518 WrongPeerId {
1520 obtained: PeerId,
1521 endpoint: ConnectedPoint,
1522 },
1523 Denied { cause: ConnectionDenied },
1527 Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
1529}
1530
1531impl From<PendingOutboundConnectionError> for DialError {
1532 fn from(error: PendingOutboundConnectionError) -> Self {
1533 match error {
1534 PendingConnectionError::Aborted => DialError::Aborted,
1535 PendingConnectionError::WrongPeerId { obtained, endpoint } => {
1536 DialError::WrongPeerId { obtained, endpoint }
1537 }
1538 PendingConnectionError::LocalPeerId { endpoint } => DialError::LocalPeerId { endpoint },
1539 PendingConnectionError::Transport(e) => DialError::Transport(e),
1540 }
1541 }
1542}
1543
1544impl fmt::Display for DialError {
1545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1546 match self {
1547 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1548 DialError::LocalPeerId { endpoint } => write!(
1549 f,
1550 "Dial error: tried to dial local peer id at {endpoint:?}."
1551 ),
1552 DialError::DialPeerConditionFalse(PeerCondition::Disconnected) => write!(f, "Dial error: dial condition was configured to only happen when disconnected (`PeerCondition::Disconnected`), but node is already connected, thus cancelling new dial."),
1553 DialError::DialPeerConditionFalse(PeerCondition::NotDialing) => write!(f, "Dial error: dial condition was configured to only happen if there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but a dial is in progress, thus cancelling new dial."),
1554 DialError::DialPeerConditionFalse(PeerCondition::DisconnectedAndNotDialing) => write!(f, "Dial error: dial condition was configured to only happen when both disconnected (`PeerCondition::Disconnected`) and there is currently no ongoing dialing attempt (`PeerCondition::NotDialing`), but node is already connected or dial is in progress, thus cancelling new dial."),
1555 DialError::DialPeerConditionFalse(PeerCondition::Always) => unreachable!("Dial peer condition is by definition true."),
1556 DialError::Aborted => write!(
1557 f,
1558 "Dial error: Pending connection attempt has been aborted."
1559 ),
1560 DialError::WrongPeerId { obtained, endpoint } => write!(
1561 f,
1562 "Dial error: Unexpected peer ID {obtained} at {endpoint:?}."
1563 ),
1564 DialError::Transport(errors) => {
1565 write!(f, "Failed to negotiate transport protocol(s): [")?;
1566
1567 for (addr, error) in errors {
1568 write!(f, "({addr}")?;
1569 print_error_chain(f, error)?;
1570 write!(f, ")")?;
1571 }
1572 write!(f, "]")?;
1573
1574 Ok(())
1575 }
1576 DialError::Denied { .. } => {
1577 write!(f, "Dial error")
1578 }
1579 }
1580 }
1581}
1582
1583fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::Result {
1584 write!(f, ": {e}")?;
1585
1586 if let Some(source) = e.source() {
1587 print_error_chain(f, source)?;
1588 }
1589
1590 Ok(())
1591}
1592
1593impl error::Error for DialError {
1594 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1595 match self {
1596 DialError::LocalPeerId { .. } => None,
1597 DialError::NoAddresses => None,
1598 DialError::DialPeerConditionFalse(_) => None,
1599 DialError::Aborted => None,
1600 DialError::WrongPeerId { .. } => None,
1601 DialError::Transport(_) => None,
1602 DialError::Denied { cause } => Some(cause),
1603 }
1604 }
1605}
1606
1607#[derive(Debug)]
1609pub enum ListenError {
1610 Aborted,
1612 WrongPeerId {
1614 obtained: PeerId,
1615 endpoint: ConnectedPoint,
1616 },
1617 LocalPeerId {
1619 endpoint: ConnectedPoint,
1620 },
1621 Denied {
1622 cause: ConnectionDenied,
1623 },
1624 Transport(TransportError<io::Error>),
1626}
1627
1628impl From<PendingInboundConnectionError> for ListenError {
1629 fn from(error: PendingInboundConnectionError) -> Self {
1630 match error {
1631 PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner),
1632 PendingInboundConnectionError::Aborted => ListenError::Aborted,
1633 PendingInboundConnectionError::WrongPeerId { obtained, endpoint } => {
1634 ListenError::WrongPeerId { obtained, endpoint }
1635 }
1636 PendingInboundConnectionError::LocalPeerId { endpoint } => {
1637 ListenError::LocalPeerId { endpoint }
1638 }
1639 }
1640 }
1641}
1642
1643impl fmt::Display for ListenError {
1644 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1645 match self {
1646 ListenError::Aborted => write!(
1647 f,
1648 "Listen error: Pending connection attempt has been aborted."
1649 ),
1650 ListenError::WrongPeerId { obtained, endpoint } => write!(
1651 f,
1652 "Listen error: Unexpected peer ID {obtained} at {endpoint:?}."
1653 ),
1654 ListenError::Transport(_) => {
1655 write!(f, "Listen error: Failed to negotiate transport protocol(s)")
1656 }
1657 ListenError::Denied { cause } => {
1658 write!(f, "Listen error: Denied: {cause}")
1659 }
1660 ListenError::LocalPeerId { endpoint } => {
1661 write!(f, "Listen error: Local peer ID at {endpoint:?}.")
1662 }
1663 }
1664 }
1665}
1666
1667impl error::Error for ListenError {
1668 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1669 match self {
1670 ListenError::WrongPeerId { .. } => None,
1671 ListenError::Transport(err) => Some(err),
1672 ListenError::Aborted => None,
1673 ListenError::Denied { cause } => Some(cause),
1674 ListenError::LocalPeerId { .. } => None,
1675 }
1676 }
1677}
1678
1679#[derive(Debug)]
1684pub struct ConnectionDenied {
1685 inner: Box<dyn error::Error + Send + Sync + 'static>,
1686}
1687
1688impl ConnectionDenied {
1689 pub fn new(cause: impl Into<Box<dyn error::Error + Send + Sync + 'static>>) -> Self {
1690 Self {
1691 inner: cause.into(),
1692 }
1693 }
1694
1695 pub fn downcast<E>(self) -> Result<E, Self>
1697 where
1698 E: error::Error + Send + Sync + 'static,
1699 {
1700 let inner = self
1701 .inner
1702 .downcast::<E>()
1703 .map_err(|inner| ConnectionDenied { inner })?;
1704
1705 Ok(*inner)
1706 }
1707
1708 pub fn downcast_ref<E>(&self) -> Option<&E>
1710 where
1711 E: error::Error + Send + Sync + 'static,
1712 {
1713 self.inner.downcast_ref::<E>()
1714 }
1715}
1716
1717impl fmt::Display for ConnectionDenied {
1718 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1719 write!(f, "connection denied")
1720 }
1721}
1722
1723impl error::Error for ConnectionDenied {
1724 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1725 Some(self.inner.as_ref())
1726 }
1727}
1728
1729#[derive(Clone, Debug)]
1731pub struct NetworkInfo {
1732 num_peers: usize,
1734 connection_counters: ConnectionCounters,
1736}
1737
1738impl NetworkInfo {
1739 pub fn num_peers(&self) -> usize {
1742 self.num_peers
1743 }
1744
1745 pub fn connection_counters(&self) -> &ConnectionCounters {
1747 &self.connection_counters
1748 }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753 use ant_libp2p_core::{
1754 multiaddr,
1755 multiaddr::multiaddr,
1756 transport,
1757 transport::{memory::MemoryTransportError, PortUse, TransportEvent},
1758 upgrade, Endpoint,
1759 };
1760 use libp2p_identity as identity;
1761 use libp2p_plaintext as plaintext;
1762 use libp2p_yamux as yamux;
1763 use quickcheck::*;
1764
1765 use super::*;
1766 use crate::test::{CallTraceBehaviour, MockBehaviour};
1767
1768 enum State {
1771 Connecting,
1772 Disconnecting,
1773 }
1774
1775 fn new_test_swarm(
1776 config: Config,
1777 ) -> Swarm<CallTraceBehaviour<MockBehaviour<dummy::ConnectionHandler, ()>>> {
1778 let id_keys = identity::Keypair::generate_ed25519();
1779 let local_public_key = id_keys.public();
1780 let transport = transport::MemoryTransport::default()
1781 .upgrade(upgrade::Version::V1)
1782 .authenticate(plaintext::Config::new(&id_keys))
1783 .multiplex(yamux::Config::default())
1784 .boxed();
1785 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(dummy::ConnectionHandler));
1786
1787 Swarm::new(
1788 transport,
1789 behaviour,
1790 local_public_key.into(),
1791 config.with_idle_connection_timeout(Duration::from_secs(5)),
1792 )
1793 }
1794
1795 fn swarms_connected<TBehaviour>(
1796 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1797 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1798 num_connections: usize,
1799 ) -> bool
1800 where
1801 TBehaviour: NetworkBehaviour,
1802 THandlerOutEvent<TBehaviour>: Clone,
1803 {
1804 swarm1
1805 .behaviour()
1806 .num_connections_to_peer(*swarm2.local_peer_id())
1807 == num_connections
1808 && swarm2
1809 .behaviour()
1810 .num_connections_to_peer(*swarm1.local_peer_id())
1811 == num_connections
1812 && swarm1.is_connected(swarm2.local_peer_id())
1813 && swarm2.is_connected(swarm1.local_peer_id())
1814 }
1815
1816 fn swarms_disconnected<TBehaviour>(
1817 swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
1818 swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
1819 ) -> bool
1820 where
1821 TBehaviour: NetworkBehaviour,
1822 THandlerOutEvent<TBehaviour>: Clone,
1823 {
1824 swarm1
1825 .behaviour()
1826 .num_connections_to_peer(*swarm2.local_peer_id())
1827 == 0
1828 && swarm2
1829 .behaviour()
1830 .num_connections_to_peer(*swarm1.local_peer_id())
1831 == 0
1832 && !swarm1.is_connected(swarm2.local_peer_id())
1833 && !swarm2.is_connected(swarm1.local_peer_id())
1834 }
1835
1836 #[tokio::test]
1843 async fn test_swarm_disconnect() {
1844 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1845 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1846
1847 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1848 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1849
1850 swarm1.listen_on(addr1.clone()).unwrap();
1851 swarm2.listen_on(addr2.clone()).unwrap();
1852
1853 let swarm1_id = *swarm1.local_peer_id();
1854
1855 let mut reconnected = false;
1856 let num_connections = 10;
1857
1858 for _ in 0..num_connections {
1859 swarm1.dial(addr2.clone()).unwrap();
1860 }
1861 let mut state = State::Connecting;
1862
1863 future::poll_fn(move |cx| loop {
1864 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1865 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1866 match state {
1867 State::Connecting => {
1868 if swarms_connected(&swarm1, &swarm2, num_connections) {
1869 if reconnected {
1870 return Poll::Ready(());
1871 }
1872 swarm2
1873 .disconnect_peer_id(swarm1_id)
1874 .expect("Error disconnecting");
1875 state = State::Disconnecting;
1876 }
1877 }
1878 State::Disconnecting => {
1879 if swarms_disconnected(&swarm1, &swarm2) {
1880 if reconnected {
1881 return Poll::Ready(());
1882 }
1883 reconnected = true;
1884 for _ in 0..num_connections {
1885 swarm2.dial(addr1.clone()).unwrap();
1886 }
1887 state = State::Connecting;
1888 }
1889 }
1890 }
1891
1892 if poll1.is_pending() && poll2.is_pending() {
1893 return Poll::Pending;
1894 }
1895 })
1896 .await
1897 }
1898
1899 #[tokio::test]
1907 async fn test_behaviour_disconnect_all() {
1908 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1909 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1910
1911 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1912 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1913
1914 swarm1.listen_on(addr1.clone()).unwrap();
1915 swarm2.listen_on(addr2.clone()).unwrap();
1916
1917 let swarm1_id = *swarm1.local_peer_id();
1918
1919 let mut reconnected = false;
1920 let num_connections = 10;
1921
1922 for _ in 0..num_connections {
1923 swarm1.dial(addr2.clone()).unwrap();
1924 }
1925 let mut state = State::Connecting;
1926
1927 future::poll_fn(move |cx| loop {
1928 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1929 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1930 match state {
1931 State::Connecting => {
1932 if swarms_connected(&swarm1, &swarm2, num_connections) {
1933 if reconnected {
1934 return Poll::Ready(());
1935 }
1936 swarm2
1937 .behaviour
1938 .inner()
1939 .next_action
1940 .replace(ToSwarm::CloseConnection {
1941 peer_id: swarm1_id,
1942 connection: CloseConnection::All,
1943 });
1944 state = State::Disconnecting;
1945 continue;
1946 }
1947 }
1948 State::Disconnecting => {
1949 if swarms_disconnected(&swarm1, &swarm2) {
1950 reconnected = true;
1951 for _ in 0..num_connections {
1952 swarm2.dial(addr1.clone()).unwrap();
1953 }
1954 state = State::Connecting;
1955 continue;
1956 }
1957 }
1958 }
1959
1960 if poll1.is_pending() && poll2.is_pending() {
1961 return Poll::Pending;
1962 }
1963 })
1964 .await
1965 }
1966
1967 #[tokio::test]
1975 async fn test_behaviour_disconnect_one() {
1976 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
1977 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
1978
1979 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1980 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1981
1982 swarm1.listen_on(addr1).unwrap();
1983 swarm2.listen_on(addr2.clone()).unwrap();
1984
1985 let swarm1_id = *swarm1.local_peer_id();
1986
1987 let num_connections = 10;
1988
1989 for _ in 0..num_connections {
1990 swarm1.dial(addr2.clone()).unwrap();
1991 }
1992 let mut state = State::Connecting;
1993 let mut disconnected_conn_id = None;
1994
1995 future::poll_fn(move |cx| loop {
1996 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1997 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1998 match state {
1999 State::Connecting => {
2000 if swarms_connected(&swarm1, &swarm2, num_connections) {
2001 disconnected_conn_id = {
2002 let conn_id =
2003 swarm2.behaviour.on_connection_established[num_connections / 2].1;
2004 swarm2.behaviour.inner().next_action.replace(
2005 ToSwarm::CloseConnection {
2006 peer_id: swarm1_id,
2007 connection: CloseConnection::One(conn_id),
2008 },
2009 );
2010 Some(conn_id)
2011 };
2012 state = State::Disconnecting;
2013 }
2014 }
2015 State::Disconnecting => {
2016 for s in &[&swarm1, &swarm2] {
2017 assert!(s
2018 .behaviour
2019 .on_connection_closed
2020 .iter()
2021 .all(|(.., remaining_conns)| *remaining_conns > 0));
2022 assert_eq!(s.behaviour.on_connection_established.len(), num_connections);
2023 s.behaviour.assert_connected(num_connections, 1);
2024 }
2025 if [&swarm1, &swarm2]
2026 .iter()
2027 .all(|s| s.behaviour.on_connection_closed.len() == 1)
2028 {
2029 let conn_id = swarm2.behaviour.on_connection_closed[0].1;
2030 assert_eq!(Some(conn_id), disconnected_conn_id);
2031 return Poll::Ready(());
2032 }
2033 }
2034 }
2035
2036 if poll1.is_pending() && poll2.is_pending() {
2037 return Poll::Pending;
2038 }
2039 })
2040 .await
2041 }
2042
2043 #[test]
2044 fn concurrent_dialing() {
2045 #[derive(Clone, Debug)]
2046 struct DialConcurrencyFactor(NonZeroU8);
2047
2048 impl Arbitrary for DialConcurrencyFactor {
2049 fn arbitrary(g: &mut Gen) -> Self {
2050 Self(NonZeroU8::new(g.gen_range(1..11)).unwrap())
2051 }
2052 }
2053
2054 fn prop(concurrency_factor: DialConcurrencyFactor) {
2055 tokio::runtime::Runtime::new().unwrap().block_on(async {
2056 let mut swarm = new_test_swarm(
2057 Config::with_tokio_executor()
2058 .with_dial_concurrency_factor(concurrency_factor.0),
2059 );
2060
2061 let num_listen_addrs = concurrency_factor.0.get() + 2;
2065 let mut listen_addresses = Vec::new();
2066 let mut transports = Vec::new();
2067 for _ in 0..num_listen_addrs {
2068 let mut transport = transport::MemoryTransport::default().boxed();
2069 transport
2070 .listen_on(ListenerId::next(), "/memory/0".parse().unwrap())
2071 .unwrap();
2072
2073 match transport.select_next_some().await {
2074 TransportEvent::NewAddress { listen_addr, .. } => {
2075 listen_addresses.push(listen_addr);
2076 }
2077 _ => panic!("Expected `NewListenAddr` event."),
2078 }
2079
2080 transports.push(transport);
2081 }
2082
2083 swarm
2086 .dial(
2087 DialOpts::peer_id(PeerId::random())
2088 .addresses(listen_addresses)
2089 .build(),
2090 )
2091 .unwrap();
2092 for mut transport in transports.into_iter() {
2093 match futures::future::select(transport.select_next_some(), swarm.next()).await
2094 {
2095 future::Either::Left((TransportEvent::Incoming { .. }, _)) => {}
2096 future::Either::Left(_) => {
2097 panic!("Unexpected transport event.")
2098 }
2099 future::Either::Right((e, _)) => {
2100 panic!("Expect swarm to not emit any event {e:?}")
2101 }
2102 }
2103 }
2104
2105 match swarm.next().await.unwrap() {
2106 SwarmEvent::OutgoingConnectionError { .. } => {}
2107 e => panic!("Unexpected swarm event {e:?}"),
2108 }
2109 })
2110 }
2111
2112 QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
2113 }
2114
2115 #[tokio::test]
2116 async fn invalid_peer_id() {
2117 let mut swarm1 = new_test_swarm(Config::with_tokio_executor());
2121 let mut swarm2 = new_test_swarm(Config::with_tokio_executor());
2122
2123 swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();
2124
2125 let address = future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
2126 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2127 Poll::Pending => Poll::Pending,
2128 _ => panic!("Was expecting the listen address to be reported"),
2129 })
2130 .await;
2131
2132 let other_id = PeerId::random();
2133 let other_addr = address.with(multiaddr::Protocol::P2p(other_id));
2134
2135 swarm2.dial(other_addr.clone()).unwrap();
2136
2137 let (peer_id, error) = future::poll_fn(|cx| {
2138 if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
2139 swarm1.poll_next_unpin(cx)
2140 {}
2141
2142 match swarm2.poll_next_unpin(cx) {
2143 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2144 peer_id, error, ..
2145 })) => Poll::Ready((peer_id, error)),
2146 Poll::Ready(x) => panic!("unexpected {x:?}"),
2147 Poll::Pending => Poll::Pending,
2148 }
2149 })
2150 .await;
2151 assert_eq!(peer_id.unwrap(), other_id);
2152 match error {
2153 DialError::WrongPeerId { obtained, endpoint } => {
2154 assert_eq!(obtained, *swarm1.local_peer_id());
2155 assert_eq!(
2156 endpoint,
2157 ConnectedPoint::Dialer {
2158 address: other_addr,
2159 role_override: Endpoint::Dialer,
2160 port_use: PortUse::Reuse,
2161 }
2162 );
2163 }
2164 x => panic!("wrong error {x:?}"),
2165 }
2166 }
2167
2168 #[tokio::test]
2169 async fn dial_self() {
2170 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2183 swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
2184
2185 let local_address = future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
2186 Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => Poll::Ready(address),
2187 Poll::Pending => Poll::Pending,
2188 _ => panic!("Was expecting the listen address to be reported"),
2189 })
2190 .await;
2191
2192 swarm.listened_addrs.clear();
2195 swarm.dial(local_address.clone()).unwrap();
2196
2197 let mut got_dial_err = false;
2198 let mut got_inc_err = false;
2199 future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
2200 loop {
2201 match swarm.poll_next_unpin(cx) {
2202 Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
2203 peer_id,
2204 error: DialError::LocalPeerId { .. },
2205 ..
2206 })) => {
2207 assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
2208 assert!(!got_dial_err);
2209 got_dial_err = true;
2210 if got_inc_err {
2211 return Poll::Ready(Ok(()));
2212 }
2213 }
2214 Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
2215 local_addr, ..
2216 })) => {
2217 assert!(!got_inc_err);
2218 assert_eq!(local_addr, local_address);
2219 got_inc_err = true;
2220 if got_dial_err {
2221 return Poll::Ready(Ok(()));
2222 }
2223 }
2224 Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
2225 assert_eq!(local_addr, local_address);
2226 }
2227 Poll::Ready(ev) => {
2228 panic!("Unexpected event: {ev:?}")
2229 }
2230 Poll::Pending => break Poll::Pending,
2231 }
2232 }
2233 })
2234 .await
2235 .unwrap();
2236 }
2237
2238 #[tokio::test]
2239 async fn dial_self_by_id() {
2240 let swarm = new_test_swarm(Config::with_tokio_executor());
2243 let peer_id = *swarm.local_peer_id();
2244 assert!(!swarm.is_connected(&peer_id));
2245 }
2246
2247 #[tokio::test]
2248 async fn multiple_addresses_err() {
2249 let target = PeerId::random();
2252
2253 let mut swarm = new_test_swarm(Config::with_tokio_executor());
2254
2255 let addresses = HashSet::from([
2256 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2257 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2258 multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())],
2259 multiaddr![Udp(rand::random::<u16>())],
2260 multiaddr![Udp(rand::random::<u16>())],
2261 multiaddr![Udp(rand::random::<u16>())],
2262 multiaddr![Udp(rand::random::<u16>())],
2263 multiaddr![Udp(rand::random::<u16>())],
2264 ]);
2265
2266 swarm
2267 .dial(
2268 DialOpts::peer_id(target)
2269 .addresses(addresses.iter().cloned().collect())
2270 .build(),
2271 )
2272 .unwrap();
2273
2274 match swarm.next().await.unwrap() {
2275 SwarmEvent::OutgoingConnectionError {
2276 peer_id,
2277 error: DialError::Transport(errors),
2279 ..
2280 } => {
2281 assert_eq!(target, peer_id.unwrap());
2282
2283 let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
2284 let expected_addresses = addresses
2285 .into_iter()
2286 .map(|addr| addr.with(multiaddr::Protocol::P2p(target)))
2287 .collect::<Vec<_>>();
2288
2289 assert_eq!(expected_addresses, failed_addresses);
2290 }
2291 e => panic!("Unexpected event: {e:?}"),
2292 }
2293 }
2294
2295 #[tokio::test]
2296 async fn aborting_pending_connection_surfaces_error() {
2297 let _ = tracing_subscriber::fmt()
2298 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
2299 .try_init();
2300
2301 let mut dialer = new_test_swarm(Config::with_tokio_executor());
2302 let mut listener = new_test_swarm(Config::with_tokio_executor());
2303
2304 let listener_peer_id = *listener.local_peer_id();
2305 listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
2306 let listener_address = match listener.next().await.unwrap() {
2307 SwarmEvent::NewListenAddr { address, .. } => address,
2308 e => panic!("Unexpected network event: {e:?}"),
2309 };
2310
2311 dialer
2312 .dial(
2313 DialOpts::peer_id(listener_peer_id)
2314 .addresses(vec![listener_address])
2315 .build(),
2316 )
2317 .unwrap();
2318
2319 dialer
2320 .disconnect_peer_id(listener_peer_id)
2321 .expect_err("Expect peer to not yet be connected.");
2322
2323 match dialer.next().await.unwrap() {
2324 SwarmEvent::OutgoingConnectionError {
2325 error: DialError::Aborted,
2326 ..
2327 } => {}
2328 e => panic!("Unexpected swarm event {e:?}."),
2329 }
2330 }
2331
2332 #[test]
2333 fn dial_error_prints_sources() {
2334 let error = DialError::Transport(vec![(
2336 "/ip4/127.0.0.1/tcp/80".parse().unwrap(),
2337 TransportError::Other(io::Error::new(
2338 io::ErrorKind::Other,
2339 MemoryTransportError::Unreachable,
2340 )),
2341 )]);
2342
2343 let string = format!("{error}");
2344
2345 assert_eq!("Failed to negotiate transport protocol(s): [(/ip4/127.0.0.1/tcp/80: : No listener on the given port.)]", string)
2348 }
2349}