1mod behaviour;
57mod registry;
58#[cfg(test)]
59mod test;
60mod upgrade;
61
62pub mod protocols_handler;
63pub mod toggle;
64
65pub use behaviour::{
66 NetworkBehaviour,
67 NetworkBehaviourAction,
68 NetworkBehaviourEventProcess,
69 PollParameters,
70 NotifyHandler,
71 DialPeerCondition
72};
73pub use protocols_handler::{
74 IntoProtocolsHandler,
75 IntoProtocolsHandlerSelect,
76 KeepAlive,
77 ProtocolsHandler,
78 ProtocolsHandlerEvent,
79 ProtocolsHandlerSelect,
80 ProtocolsHandlerUpgrErr,
81 OneShotHandler,
82 OneShotHandlerConfig,
83 SubstreamProtocol
84};
85pub use registry::{AddressScore, AddressRecord, AddAddressResult};
86
87use protocols_handler::{
88 NodeHandlerWrapperBuilder,
89 NodeHandlerWrapperError,
90};
91use futures::{
92 prelude::*,
93 executor::ThreadPoolBuilder,
94 stream::FusedStream,
95};
96use libp2p_core::{
97 Executor,
98 Transport,
99 Multiaddr,
100 Negotiated,
101 PeerId,
102 connection::{
103 ConnectionError,
104 ConnectionId,
105 ConnectionLimit,
106 ConnectedPoint,
107 EstablishedConnection,
108 IntoConnectionHandler,
109 ListenerId,
110 PendingConnectionError,
111 Substream
112 },
113 transport::{self, TransportError},
114 muxing::StreamMuxerBox,
115 network::{
116 ConnectionLimits,
117 Network,
118 NetworkInfo,
119 NetworkEvent,
120 NetworkConfig,
121 peer::ConnectedPeer,
122 },
123 upgrade::{ProtocolName},
124};
125use registry::{Addresses, AddressIntoIter};
126use smallvec::SmallVec;
127use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
128use std::collections::HashSet;
129use std::num::{NonZeroU32, NonZeroUsize};
130use upgrade::UpgradeInfoSend as _;
131
132pub type Swarm<TBehaviour> = ExpandedSwarm<
134 TBehaviour,
135 <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
136 <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
137 <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
138>;
139
140pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
145
146#[derive(Debug)]
148pub enum SwarmEvent<TBvEv, THandleErr> {
149 Behaviour(TBvEv),
151 ConnectionEstablished {
153 peer_id: PeerId,
155 endpoint: ConnectedPoint,
157 num_established: NonZeroU32,
160 },
161 ConnectionClosed {
164 peer_id: PeerId,
166 endpoint: ConnectedPoint,
168 num_established: u32,
170 cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
173 },
174 IncomingConnection {
181 local_addr: Multiaddr,
185 send_back_addr: Multiaddr,
187 },
188 IncomingConnectionError {
193 local_addr: Multiaddr,
197 send_back_addr: Multiaddr,
199 error: PendingConnectionError<io::Error>,
201 },
202 BannedPeer {
204 peer_id: PeerId,
206 endpoint: ConnectedPoint,
208 },
209 UnreachableAddr {
211 peer_id: PeerId,
213 address: Multiaddr,
215 error: PendingConnectionError<io::Error>,
217 attempts_remaining: u32,
219 },
220 UnknownPeerUnreachableAddr {
224 address: Multiaddr,
226 error: PendingConnectionError<io::Error>,
228 },
229 NewListenAddr(Multiaddr),
231 ExpiredListenAddr(Multiaddr),
233 ListenerClosed {
235 addresses: Vec<Multiaddr>,
239 reason: Result<(), io::Error>,
242 },
243 ListenerError {
245 error: io::Error,
247 },
248 Dialing(PeerId),
255}
256
257pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
259where
260 THandler: IntoProtocolsHandler,
261{
262 network: Network<
263 transport::Boxed<(PeerId, StreamMuxerBox)>,
264 TInEvent,
265 TOutEvent,
266 NodeHandlerWrapperBuilder<THandler>,
267 >,
268
269 behaviour: TBehaviour,
272
273 supported_protocols: SmallVec<[Vec<u8>; 16]>,
275
276 listened_addrs: SmallVec<[Multiaddr; 8]>,
278
279 external_addrs: Addresses,
282
283 banned_peers: HashSet<PeerId>,
285
286 pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
290
291 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
293}
294
295impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
296 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
297where
298 THandler: IntoProtocolsHandler,
299{
300 type Target = TBehaviour;
301
302 fn deref(&self) -> &Self::Target {
303 &self.behaviour
304 }
305}
306
307impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
308 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
309where
310 THandler: IntoProtocolsHandler,
311{
312 fn deref_mut(&mut self) -> &mut Self::Target {
313 &mut self.behaviour
314 }
315}
316
317impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
318 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
319where
320 THandler: IntoProtocolsHandler,
321{
322}
323
324impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
325 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
326where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
327 TInEvent: Send + 'static,
328 TOutEvent: Send + 'static,
329 THandler: IntoProtocolsHandler + Send + 'static,
330 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
331 THandleErr: error::Error + Send + 'static,
332{
333 pub fn new(
335 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
336 behaviour: TBehaviour,
337 local_peer_id: PeerId
338 ) -> Self {
339 SwarmBuilder::new(transport, behaviour, local_peer_id).build()
340 }
341
342 pub fn network_info(me: &Self) -> NetworkInfo {
344 me.network.info()
345 }
346
347 pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
351 me.network.listen_on(addr)
352 }
353
354 pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
358 me.network.remove_listener(id)
359 }
360
361 pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
363 let handler = me.behaviour.new_handler()
364 .into_node_handler_builder()
365 .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
366 me.network.dial(&addr, handler).map(|_id| ())
367 }
368
369 pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
371 if me.banned_peers.contains(peer_id) {
372 me.behaviour.inject_dial_failure(peer_id);
373 return Err(DialError::Banned)
374 }
375
376 let self_listening = &me.listened_addrs;
377 let mut addrs = me.behaviour.addresses_of_peer(peer_id)
378 .into_iter()
379 .filter(|a| !self_listening.contains(a));
380
381 let result =
382 if let Some(first) = addrs.next() {
383 let handler = me.behaviour.new_handler()
384 .into_node_handler_builder()
385 .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
386 me.network.peer(*peer_id)
387 .dial(first, addrs, handler)
388 .map(|_| ())
389 .map_err(DialError::ConnectionLimit)
390 } else {
391 Err(DialError::NoAddresses)
392 };
393
394 if let Err(error) = &result {
395 log::debug!(
396 "New dialing attempt to peer {:?} failed: {:?}.",
397 peer_id, error);
398 me.behaviour.inject_dial_failure(&peer_id);
399 }
400
401 result
402 }
403
404 pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
406 me.network.listen_addrs()
407 }
408
409 pub fn local_peer_id(me: &Self) -> &PeerId {
411 me.network.local_peer_id()
412 }
413
414 pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
418 me.external_addrs.iter()
419 }
420
421 pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
436 me.external_addrs.add(a, s)
437 }
438
439 pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
446 me.external_addrs.remove(addr)
447 }
448
449 pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
454 if me.banned_peers.insert(peer_id) {
455 if let Some(peer) = me.network.peer(peer_id).into_connected() {
456 peer.disconnect();
457 }
458 }
459 }
460
461 pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
463 me.banned_peers.remove(&peer_id);
464 }
465
466 pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool {
468 me.network.is_connected(peer_id)
469 }
470
471 pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
475 future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
476 }
477
478 pub async fn next(&mut self) -> TBehaviour::OutEvent {
480 future::poll_fn(move |cx| {
481 loop {
482 let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
483 if let SwarmEvent::Behaviour(event) = event {
484 return Poll::Ready(event);
485 }
486 }
487 }).await
488 }
489
490 fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
494 -> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
495 {
496 let this = &mut *self;
499
500 loop {
501 let mut network_not_ready = false;
502
503 match this.network.poll(cx) {
505 Poll::Pending => network_not_ready = true,
506 Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
507 let peer = connection.peer_id();
508 let connection = connection.id();
509 this.behaviour.inject_event(peer, connection, event);
510 },
511 Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
512 let peer = connection.peer_id();
513 let connection = connection.id();
514 this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
515 },
516 Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
517 let peer_id = connection.peer_id();
518 let endpoint = connection.endpoint().clone();
519 if this.banned_peers.contains(&peer_id) {
520 this.network.peer(peer_id)
521 .into_connected()
522 .expect("the Network just notified us that we were connected; QED")
523 .disconnect();
524 return Poll::Ready(SwarmEvent::BannedPeer {
525 peer_id,
526 endpoint,
527 });
528 } else {
529 log::debug!("Connection established: {:?}; Total (peer): {}.",
530 connection.connected(), num_established);
531 let endpoint = connection.endpoint().clone();
532 this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
533 if num_established.get() == 1 {
534 this.behaviour.inject_connected(&peer_id);
535 }
536 return Poll::Ready(SwarmEvent::ConnectionEstablished {
537 peer_id, num_established, endpoint
538 });
539 }
540 },
541 Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
542 if let Some(error) = error.as_ref() {
543 log::debug!("Connection {:?} closed: {:?}", connected, error);
544 } else {
545 log::debug!("Connection {:?} closed (active close).", connected);
546 }
547 let peer_id = connected.peer_id;
548 let endpoint = connected.endpoint;
549 this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
550 if num_established == 0 {
551 this.behaviour.inject_disconnected(&peer_id);
552 }
553 return Poll::Ready(SwarmEvent::ConnectionClosed {
554 peer_id,
555 endpoint,
556 cause: error,
557 num_established,
558 });
559 },
560 Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
561 let handler = this.behaviour.new_handler()
562 .into_node_handler_builder()
563 .with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
564 let local_addr = connection.local_addr.clone();
565 let send_back_addr = connection.send_back_addr.clone();
566 if let Err(e) = this.network.accept(connection, handler) {
567 log::warn!("Incoming connection rejected: {:?}", e);
568 }
569 return Poll::Ready(SwarmEvent::IncomingConnection {
570 local_addr,
571 send_back_addr,
572 });
573 },
574 Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
575 log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
576 if !this.listened_addrs.contains(&listen_addr) {
577 this.listened_addrs.push(listen_addr.clone())
578 }
579 this.behaviour.inject_new_listen_addr(&listen_addr);
580 return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
581 }
582 Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
583 log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
584 this.listened_addrs.retain(|a| a != &listen_addr);
585 this.behaviour.inject_expired_listen_addr(&listen_addr);
586 return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
587 }
588 Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
589 log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
590 for addr in addresses.iter() {
591 this.behaviour.inject_expired_listen_addr(addr);
592 }
593 this.behaviour.inject_listener_closed(listener_id, match &reason {
594 Ok(()) => Ok(()),
595 Err(err) => Err(err),
596 });
597 return Poll::Ready(SwarmEvent::ListenerClosed {
598 addresses,
599 reason,
600 });
601 }
602 Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
603 this.behaviour.inject_listener_error(listener_id, &error);
604 return Poll::Ready(SwarmEvent::ListenerError {
605 error,
606 });
607 },
608 Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
609 log::debug!("Incoming connection failed: {:?}", error);
610 return Poll::Ready(SwarmEvent::IncomingConnectionError {
611 local_addr,
612 send_back_addr,
613 error,
614 });
615 },
616 Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
617 log::debug!(
618 "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
619 peer_id, multiaddr, error, attempts_remaining);
620 this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
621 if attempts_remaining == 0 {
622 this.behaviour.inject_dial_failure(&peer_id);
623 }
624 return Poll::Ready(SwarmEvent::UnreachableAddr {
625 peer_id,
626 address: multiaddr,
627 error,
628 attempts_remaining,
629 });
630 },
631 Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
632 log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
633 multiaddr, error);
634 this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
635 return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
636 address: multiaddr,
637 error,
638 });
639 },
640 }
641
642 if let Some((peer_id, handler, event)) = this.pending_event.take() {
648 if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
649 match handler {
650 PendingNotifyHandler::One(conn_id) =>
651 if let Some(mut conn) = peer.connection(conn_id) {
652 if let Some(event) = notify_one(&mut conn, event, cx) {
653 this.pending_event = Some((peer_id, handler, event));
654 return Poll::Pending
655 }
656 },
657 PendingNotifyHandler::Any(ids) => {
658 if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
659 let handler = PendingNotifyHandler::Any(ids);
660 this.pending_event = Some((peer_id, handler, event));
661 return Poll::Pending
662 }
663 }
664 }
665 }
666 }
667
668 debug_assert!(this.pending_event.is_none());
669
670 let behaviour_poll = {
671 let mut parameters = SwarmPollParameters {
672 local_peer_id: &mut this.network.local_peer_id(),
673 supported_protocols: &this.supported_protocols,
674 listened_addrs: &this.listened_addrs,
675 external_addrs: &this.external_addrs
676 };
677 this.behaviour.poll(cx, &mut parameters)
678 };
679
680 match behaviour_poll {
681 Poll::Pending if network_not_ready => return Poll::Pending,
682 Poll::Pending => (),
683 Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
684 return Poll::Ready(SwarmEvent::Behaviour(event))
685 },
686 Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
687 let _ = ExpandedSwarm::dial_addr(&mut *this, address);
688 },
689 Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
690 if this.banned_peers.contains(&peer_id) {
691 this.behaviour.inject_dial_failure(&peer_id);
692 } else {
693 let condition_matched = match condition {
694 DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
695 DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
696 DialPeerCondition::Always => true,
697 };
698 if condition_matched {
699 if ExpandedSwarm::dial(this, &peer_id).is_ok() {
700 return Poll::Ready(SwarmEvent::Dialing(peer_id))
701 }
702 } else {
703 log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
707 peer_id, condition);
708 let self_listening = &this.listened_addrs;
709 if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
710 let addrs = this.behaviour.addresses_of_peer(peer.id());
711 let mut attempt = peer.some_attempt();
712 for a in addrs {
713 if !self_listening.contains(&a) {
714 attempt.add_address(a);
715 }
716 }
717 }
718 }
719 }
720 },
721 Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
722 if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
723 match handler {
724 NotifyHandler::One(connection) => {
725 if let Some(mut conn) = peer.connection(connection) {
726 if let Some(event) = notify_one(&mut conn, event, cx) {
727 let handler = PendingNotifyHandler::One(connection);
728 this.pending_event = Some((peer_id, handler, event));
729 return Poll::Pending
730 }
731 }
732 }
733 NotifyHandler::Any => {
734 let ids = peer.connections().into_ids().collect();
735 if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
736 let handler = PendingNotifyHandler::Any(ids);
737 this.pending_event = Some((peer_id, handler, event));
738 return Poll::Pending
739 }
740 }
741 }
742 }
743 },
744 Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
745 for addr in this.network.address_translation(&address) {
746 if this.external_addrs.iter().all(|a| a.addr != addr) {
747 this.behaviour.inject_new_external_addr(&addr);
748 }
749 this.external_addrs.add(addr, score);
750 }
751 },
752 }
753 }
754 }
755}
756
757enum PendingNotifyHandler {
764 One(ConnectionId),
765 Any(SmallVec<[ConnectionId; 10]>),
766}
767
768fn notify_one<'a, TInEvent>(
777 conn: &mut EstablishedConnection<'a, TInEvent>,
778 event: TInEvent,
779 cx: &mut Context<'_>,
780) -> Option<TInEvent>
781{
782 match conn.poll_ready_notify_handler(cx) {
783 Poll::Pending => Some(event),
784 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
786 let _ = conn.notify_handler(event);
788 None
789 }
790 }
791}
792
793fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
804 ids: SmallVec<[ConnectionId; 10]>,
805 peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
806 event: TInEvent,
807 cx: &mut Context<'_>,
808) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
809where
810 TTrans: Transport,
811 THandler: IntoConnectionHandler,
812{
813 let mut pending = SmallVec::new();
814 let mut event = Some(event); for id in ids.into_iter() {
816 if let Some(mut conn) = peer.connection(id) {
817 match conn.poll_ready_notify_handler(cx) {
818 Poll::Pending => pending.push(id),
819 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
821 let e = event.take().expect("by (1),(2)");
822 if let Err(e) = conn.notify_handler(e) {
823 event = Some(e) } else {
825 break
826 }
827 }
828 }
829 }
830 }
831
832 event.and_then(|e|
833 if !pending.is_empty() {
834 Some((e, pending))
835 } else {
836 None
837 })
838}
839
840impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
841 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
842where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
843 THandler: IntoProtocolsHandler + Send + 'static,
844 TInEvent: Send + 'static,
845 TOutEvent: Send + 'static,
846 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
847{
848 type Item = TBehaviour::OutEvent;
849
850 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
851 loop {
852 let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
853 if let SwarmEvent::Behaviour(event) = event {
854 return Poll::Ready(Some(event));
855 }
856 }
857 }
858}
859
860impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
862 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
863where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
864 THandler: IntoProtocolsHandler + Send + 'static,
865 TInEvent: Send + 'static,
866 TOutEvent: Send + 'static,
867 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
868{
869 fn is_terminated(&self) -> bool {
870 false
871 }
872}
873
874pub struct SwarmPollParameters<'a> {
877 local_peer_id: &'a PeerId,
878 supported_protocols: &'a [Vec<u8>],
879 listened_addrs: &'a [Multiaddr],
880 external_addrs: &'a Addresses,
881}
882
883impl<'a> PollParameters for SwarmPollParameters<'a> {
884 type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
885 type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
886 type ExternalAddressesIter = AddressIntoIter;
887
888 fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
889 self.supported_protocols.to_vec().into_iter()
890 }
891
892 fn listened_addresses(&self) -> Self::ListenedAddressesIter {
893 self.listened_addrs.to_vec().into_iter()
894 }
895
896 fn external_addresses(&self) -> Self::ExternalAddressesIter {
897 self.external_addrs.clone().into_iter()
898 }
899
900 fn local_peer_id(&self) -> &PeerId {
901 &self.local_peer_id
902 }
903}
904
905pub struct SwarmBuilder<TBehaviour> {
908 local_peer_id: PeerId,
909 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
910 behaviour: TBehaviour,
911 network_config: NetworkConfig,
912 substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
913}
914
915impl<TBehaviour> SwarmBuilder<TBehaviour>
916where TBehaviour: NetworkBehaviour,
917{
918 pub fn new(
922 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
923 behaviour: TBehaviour,
924 local_peer_id: PeerId
925 ) -> Self {
926 SwarmBuilder {
927 local_peer_id,
928 transport,
929 behaviour,
930 network_config: Default::default(),
931 substream_upgrade_protocol_override: None,
932 }
933 }
934
935 pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
940 self.network_config = self.network_config.with_executor(e);
941 self
942 }
943
944 pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
954 self.network_config = self.network_config.with_notify_handler_buffer_size(n);
955 self
956 }
957
958 pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
982 self.network_config = self.network_config.with_connection_event_buffer_size(n);
983 self
984 }
985
986 pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
988 self.network_config = self.network_config.with_connection_limits(limits);
989 self
990 }
991
992 pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
1003 self.substream_upgrade_protocol_override = Some(v);
1004 self
1005 }
1006
1007 pub fn build(mut self) -> Swarm<TBehaviour> {
1009 let supported_protocols = self.behaviour
1010 .new_handler()
1011 .inbound_protocol()
1012 .protocol_info()
1013 .into_iter()
1014 .map(|info| info.protocol_name().to_vec())
1015 .collect();
1016
1017 let network_cfg = self.network_config.or_else_with_executor(|| {
1019 match ThreadPoolBuilder::new()
1020 .name_prefix("libp2p-swarm-task-")
1021 .create()
1022 {
1023 Ok(tp) => {
1024 Some(Box::new(move |f| tp.spawn_ok(f)))
1025 },
1026 Err(err) => {
1027 log::warn!("Failed to create executor thread pool: {:?}", err);
1028 None
1029 }
1030 }
1031 });
1032
1033 let network = Network::new(self.transport, self.local_peer_id, network_cfg);
1034
1035 ExpandedSwarm {
1036 network,
1037 behaviour: self.behaviour,
1038 supported_protocols,
1039 listened_addrs: SmallVec::new(),
1040 external_addrs: Addresses::default(),
1041 banned_peers: HashSet::new(),
1042 pending_event: None,
1043 substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
1044 }
1045 }
1046}
1047
1048#[derive(Debug)]
1050pub enum DialError {
1051 Banned,
1053 ConnectionLimit(ConnectionLimit),
1056 NoAddresses
1059}
1060
1061impl fmt::Display for DialError {
1062 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1063 match self {
1064 DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
1065 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1066 DialError::Banned => write!(f, "Dial error: peer is banned.")
1067 }
1068 }
1069}
1070
1071impl error::Error for DialError {
1072 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1073 match self {
1074 DialError::ConnectionLimit(err) => Some(err),
1075 DialError::NoAddresses => None,
1076 DialError::Banned => None
1077 }
1078 }
1079}
1080
1081#[derive(Clone, Default)]
1083pub struct DummyBehaviour {
1084}
1085
1086impl NetworkBehaviour for DummyBehaviour {
1087 type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
1088 type OutEvent = void::Void;
1089
1090 fn new_handler(&mut self) -> Self::ProtocolsHandler {
1091 protocols_handler::DummyProtocolsHandler::default()
1092 }
1093
1094 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
1095 Vec::new()
1096 }
1097
1098 fn inject_connected(&mut self, _: &PeerId) {}
1099
1100 fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1101
1102 fn inject_disconnected(&mut self, _: &PeerId) {}
1103
1104 fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1105
1106 fn inject_event(&mut self, _: PeerId, _: ConnectionId,
1107 _: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
1108
1109 fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
1110 Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
1111 ProtocolsHandler>::InEvent, Self::OutEvent>>
1112 {
1113 Poll::Pending
1114 }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119 use crate::protocols_handler::DummyProtocolsHandler;
1120 use crate::test::{MockBehaviour, CallTraceBehaviour};
1121 use futures::{future, executor};
1122 use libp2p_core::{
1123 identity,
1124 upgrade,
1125 multiaddr,
1126 transport
1127 };
1128 use libp2p_noise as noise;
1129 use super::*;
1130
1131 fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
1132 where
1133 T: ProtocolsHandler + Clone,
1134 T::OutEvent: Clone,
1135 O: Send + 'static
1136 {
1137 let id_keys = identity::Keypair::generate_ed25519();
1138 let pubkey = id_keys.public();
1139 let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
1140 let transport = transport::MemoryTransport::default()
1141 .upgrade(upgrade::Version::V1)
1142 .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
1143 .multiplex(libp2p_mplex::MplexConfig::new())
1144 .boxed();
1145 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
1146 SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
1147 }
1148
1149 #[test]
1156 fn test_connect_disconnect_ban() {
1157 let mut handler_proto = DummyProtocolsHandler::default();
1160 handler_proto.keep_alive = KeepAlive::Yes;
1161
1162 let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
1163 let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
1164
1165 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1166 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1167
1168 Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
1169 Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
1170
1171 enum State {
1173 Connecting,
1174 Disconnecting,
1175 }
1176
1177 let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
1178
1179 let mut banned = false;
1180 let mut unbanned = false;
1181
1182 let num_connections = 10;
1183
1184 for _ in 0 .. num_connections {
1185 Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
1186 }
1187 let mut state = State::Connecting;
1188
1189 executor::block_on(future::poll_fn(move |cx| {
1190 loop {
1191 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1192 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1193 match state {
1194 State::Connecting => {
1195 for s in &[&swarm1, &swarm2] {
1196 if s.behaviour.inject_connection_established.len() > 0 {
1197 assert_eq!(s.behaviour.inject_connected.len(), 1);
1198 } else {
1199 assert_eq!(s.behaviour.inject_connected.len(), 0);
1200 }
1201 assert!(s.behaviour.inject_connection_closed.len() == 0);
1202 assert!(s.behaviour.inject_disconnected.len() == 0);
1203 }
1204 if [&swarm1, &swarm2].iter().all(|s| {
1205 s.behaviour.inject_connection_established.len() == num_connections
1206 }) {
1207 if banned {
1208 return Poll::Ready(())
1209 }
1210 Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
1211 swarm1.behaviour.reset();
1212 swarm2.behaviour.reset();
1213 banned = true;
1214 state = State::Disconnecting;
1215 }
1216 }
1217 State::Disconnecting => {
1218 for s in &[&swarm1, &swarm2] {
1219 if s.behaviour.inject_connection_closed.len() < num_connections {
1220 assert_eq!(s.behaviour.inject_disconnected.len(), 0);
1221 } else {
1222 assert_eq!(s.behaviour.inject_disconnected.len(), 1);
1223 }
1224 assert_eq!(s.behaviour.inject_connection_established.len(), 0);
1225 assert_eq!(s.behaviour.inject_connected.len(), 0);
1226 }
1227 if [&swarm1, &swarm2].iter().all(|s| {
1228 s.behaviour.inject_connection_closed.len() == num_connections
1229 }) {
1230 if unbanned {
1231 return Poll::Ready(())
1232 }
1233 Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
1235 swarm1.behaviour.reset();
1236 swarm2.behaviour.reset();
1237 unbanned = true;
1238 for _ in 0 .. num_connections {
1239 Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
1240 }
1241 state = State::Connecting;
1242 }
1243 }
1244 }
1245
1246 if poll1.is_pending() && poll2.is_pending() {
1247 return Poll::Pending
1248 }
1249 }
1250 }))
1251 }
1252}