1mod behaviour;
57mod registry;
58#[cfg(test)]
59mod test;
60mod upgrade;
61
62pub mod protocols_handler;
63pub mod toggle;
64
65pub use behaviour::{
66 NetworkBehaviour,
67 NetworkBehaviourAction,
68 NetworkBehaviourEventProcess,
69 PollParameters,
70 NotifyHandler,
71 DialPeerCondition
72};
73pub use protocols_handler::{
74 IntoProtocolsHandler,
75 IntoProtocolsHandlerSelect,
76 KeepAlive,
77 ProtocolsHandler,
78 ProtocolsHandlerEvent,
79 ProtocolsHandlerSelect,
80 ProtocolsHandlerUpgrErr,
81 OneShotHandler,
82 OneShotHandlerConfig,
83 SubstreamProtocol
84};
85pub use registry::{AddressScore, AddressRecord, AddAddressResult};
86
87use protocols_handler::{
88 NodeHandlerWrapperBuilder,
89 NodeHandlerWrapperError,
90};
91use futures::{
92 prelude::*,
93 executor::ThreadPoolBuilder,
94 stream::FusedStream,
95};
96use mwc_libp2p_core::{
97 Executor,
98 Transport,
99 Multiaddr,
100 Negotiated,
101 PeerId,
102 connection::{
103 ConnectionError,
104 ConnectionId,
105 ConnectionLimit,
106 ConnectedPoint,
107 EstablishedConnection,
108 IntoConnectionHandler,
109 ListenerId,
110 PendingConnectionError,
111 Substream
112 },
113 transport::{self, TransportError},
114 muxing::StreamMuxerBox,
115 network::{
116 ConnectionLimits,
117 Network,
118 NetworkInfo,
119 NetworkEvent,
120 NetworkConfig,
121 peer::ConnectedPeer,
122 },
123 upgrade::{ProtocolName},
124};
125use registry::{Addresses, AddressIntoIter};
126use smallvec::SmallVec;
127use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
128use std::collections::HashSet;
129use std::num::{NonZeroU32, NonZeroUsize};
130use upgrade::UpgradeInfoSend as _;
131
132pub type Swarm<TBehaviour> = ExpandedSwarm<
134 TBehaviour,
135 <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
136 <<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
137 <TBehaviour as NetworkBehaviour>::ProtocolsHandler,
138>;
139
140pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
145
146#[derive(Debug)]
148pub enum SwarmEvent<TBvEv, THandleErr> {
149 Behaviour(TBvEv),
151 ConnectionEstablished {
153 peer_id: PeerId,
155 endpoint: ConnectedPoint,
157 num_established: NonZeroU32,
160 },
161 ConnectionClosed {
164 peer_id: PeerId,
166 endpoint: ConnectedPoint,
168 num_established: u32,
170 cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
173 },
174 IncomingConnection {
181 local_addr: Multiaddr,
185 send_back_addr: Multiaddr,
187 },
188 IncomingConnectionError {
193 local_addr: Multiaddr,
197 send_back_addr: Multiaddr,
199 error: PendingConnectionError<io::Error>,
201 },
202 BannedPeer {
204 peer_id: PeerId,
206 endpoint: ConnectedPoint,
208 },
209 UnreachableAddr {
211 peer_id: PeerId,
213 address: Multiaddr,
215 error: PendingConnectionError<io::Error>,
217 attempts_remaining: u32,
219 },
220 UnknownPeerUnreachableAddr {
224 address: Multiaddr,
226 error: PendingConnectionError<io::Error>,
228 },
229 NewListenAddr(Multiaddr),
231 ExpiredListenAddr(Multiaddr),
233 ListenerClosed {
235 addresses: Vec<Multiaddr>,
239 reason: Result<(), io::Error>,
242 },
243 ListenerError {
245 error: io::Error,
247 },
248 Dialing(PeerId),
255}
256
257pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
259where
260 THandler: IntoProtocolsHandler,
261{
262 network: Network<
263 transport::Boxed<(PeerId, StreamMuxerBox)>,
264 TInEvent,
265 TOutEvent,
266 NodeHandlerWrapperBuilder<THandler>,
267 >,
268
269 behaviour: TBehaviour,
272
273 supported_protocols: SmallVec<[Vec<u8>; 16]>,
275
276 listened_addrs: SmallVec<[Multiaddr; 8]>,
278
279 external_addrs: Addresses,
282
283 banned_peers: HashSet<PeerId>,
285
286 pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
290
291 substream_upgrade_protocol_override: Option<mwc_libp2p_core::upgrade::Version>,
293}
294
295impl<TBehaviour, TInEvent, TOutEvent, THandler> Deref for
296 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
297where
298 THandler: IntoProtocolsHandler,
299{
300 type Target = TBehaviour;
301
302 fn deref(&self) -> &Self::Target {
303 &self.behaviour
304 }
305}
306
307impl<TBehaviour, TInEvent, TOutEvent, THandler> DerefMut for
308 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
309where
310 THandler: IntoProtocolsHandler,
311{
312 fn deref_mut(&mut self) -> &mut Self::Target {
313 &mut self.behaviour
314 }
315}
316
317impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
318 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
319where
320 THandler: IntoProtocolsHandler,
321{
322}
323
324impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
325 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
326where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
327 TInEvent: Send + 'static,
328 TOutEvent: Send + 'static,
329 THandler: IntoProtocolsHandler + Send + 'static,
330 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
331 THandleErr: error::Error + Send + 'static,
332{
333 pub fn new(
335 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
336 behaviour: TBehaviour,
337 local_peer_id: PeerId
338 ) -> Self {
339 SwarmBuilder::new(transport, behaviour, local_peer_id).build()
340 }
341
342 pub fn network_info(me: &Self) -> NetworkInfo {
344 me.network.info()
345 }
346
347 pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
351 me.network.listen_on(addr)
352 }
353
354 pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
358 me.network.remove_listener(id)
359 }
360
361 pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
363 let handler = me.behaviour.new_handler()
364 .into_node_handler_builder()
365 .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
366 me.network.dial(&addr, handler).map(|_id| ())
367 }
368
369 pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
371 if me.banned_peers.contains(peer_id) {
372 me.behaviour.inject_dial_failure(peer_id);
373 return Err(DialError::Banned)
374 }
375
376 let self_listening = &me.listened_addrs;
377 let mut addrs = me.behaviour.addresses_of_peer(peer_id)
378 .into_iter()
379 .filter(|a| !self_listening.contains(a));
380
381 let result =
382 if let Some(first) = addrs.next() {
383 let handler = me.behaviour.new_handler()
384 .into_node_handler_builder()
385 .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override);
386 me.network.peer(*peer_id)
387 .dial(first, addrs, handler)
388 .map(|_| ())
389 .map_err(DialError::ConnectionLimit)
390 } else {
391 Err(DialError::NoAddresses)
392 };
393
394 if let Err(error) = &result {
395 log::debug!(
396 "New dialing attempt to peer {:?} failed: {:?}.",
397 peer_id, error);
398 me.behaviour.inject_dial_failure(&peer_id);
399 }
400
401 result
402 }
403
404 pub fn listeners(me: &Self) -> impl Iterator<Item = &Multiaddr> {
406 me.network.listen_addrs()
407 }
408
409 pub fn local_peer_id(me: &Self) -> &PeerId {
411 me.network.local_peer_id()
412 }
413
414 pub fn external_addresses(me: &Self) -> impl Iterator<Item = &AddressRecord> {
418 me.external_addrs.iter()
419 }
420
421 pub fn add_external_address(me: &mut Self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
436 me.external_addrs.add(a, s)
437 }
438
439 pub fn remove_external_address(me: &mut Self, addr: &Multiaddr) -> bool {
446 me.external_addrs.remove(addr)
447 }
448
449 pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
454 if me.banned_peers.insert(peer_id) {
455 if let Some(peer) = me.network.peer(peer_id).into_connected() {
456 peer.disconnect();
457 }
458 }
459 }
460
461 pub fn unban_peer_id(me: &mut Self, peer_id: PeerId) {
463 me.banned_peers.remove(&peer_id);
464 }
465
466 pub fn is_connected(me: &Self, peer_id: &PeerId) -> bool {
468 me.network.is_connected(peer_id)
469 }
470
471 pub fn is_dialing(me: &Self, peer_id: &PeerId) -> bool {
473 me.network.is_dialing(peer_id)
474 }
475
476 pub async fn next_event(&mut self) -> SwarmEvent<TBehaviour::OutEvent, THandleErr> {
480 future::poll_fn(move |cx| ExpandedSwarm::poll_next_event(Pin::new(self), cx)).await
481 }
482
483 pub async fn next(&mut self) -> TBehaviour::OutEvent {
485 future::poll_fn(move |cx| {
486 loop {
487 let event = futures::ready!(ExpandedSwarm::poll_next_event(Pin::new(self), cx));
488 if let SwarmEvent::Behaviour(event) = event {
489 return Poll::Ready(event);
490 }
491 }
492 }).await
493 }
494
495 fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
499 -> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
500 {
501 let this = &mut *self;
504
505 loop {
506 let mut network_not_ready = false;
507
508 match this.network.poll(cx) {
510 Poll::Pending => network_not_ready = true,
511 Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
512 let peer = connection.peer_id();
513 let connection = connection.id();
514 this.behaviour.inject_event(peer, connection, event);
515 },
516 Poll::Ready(NetworkEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
517 let peer = connection.peer_id();
518 let connection = connection.id();
519 this.behaviour.inject_address_change(&peer, &connection, &old_endpoint, &new_endpoint);
520 },
521 Poll::Ready(NetworkEvent::ConnectionEstablished { connection, num_established }) => {
522 let peer_id = connection.peer_id();
523 let endpoint = connection.endpoint().clone();
524 if this.banned_peers.contains(&peer_id) {
525 this.network.peer(peer_id)
526 .into_connected()
527 .expect("the Network just notified us that we were connected; QED")
528 .disconnect();
529 return Poll::Ready(SwarmEvent::BannedPeer {
530 peer_id,
531 endpoint,
532 });
533 } else {
534 log::debug!("Connection established: {:?}; Total (peer): {}.",
535 connection.connected(), num_established);
536 let endpoint = connection.endpoint().clone();
537 this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
538 if num_established.get() == 1 {
539 this.behaviour.inject_connected(&peer_id);
540 }
541 return Poll::Ready(SwarmEvent::ConnectionEstablished {
542 peer_id, num_established, endpoint
543 });
544 }
545 },
546 Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
547 if let Some(error) = error.as_ref() {
548 log::debug!("Connection {:?} closed: {:?}", connected, error);
549 } else {
550 log::debug!("Connection {:?} closed (active close).", connected);
551 }
552 let peer_id = connected.peer_id;
553 let endpoint = connected.endpoint;
554 this.behaviour.inject_connection_closed(&peer_id, &id, &endpoint);
555 if num_established == 0 {
556 this.behaviour.inject_disconnected(&peer_id);
557 }
558 return Poll::Ready(SwarmEvent::ConnectionClosed {
559 peer_id,
560 endpoint,
561 cause: error,
562 num_established,
563 });
564 },
565 Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
566 let handler = this.behaviour.new_handler()
567 .into_node_handler_builder()
568 .with_substream_upgrade_protocol_override(this.substream_upgrade_protocol_override);
569 let local_addr = connection.local_addr.clone();
570 let send_back_addr = connection.send_back_addr.clone();
571 if let Err(e) = this.network.accept(connection, handler) {
572 log::warn!("Incoming connection rejected: {:?}", e);
573 }
574 return Poll::Ready(SwarmEvent::IncomingConnection {
575 local_addr,
576 send_back_addr,
577 });
578 },
579 Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) => {
580 log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
581 if !this.listened_addrs.contains(&listen_addr) {
582 this.listened_addrs.push(listen_addr.clone())
583 }
584 this.behaviour.inject_new_listen_addr(&listen_addr);
585 return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
586 }
587 Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
588 log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
589 this.listened_addrs.retain(|a| a != &listen_addr);
590 this.behaviour.inject_expired_listen_addr(&listen_addr);
591 return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
592 }
593 Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
594 log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
595 for addr in addresses.iter() {
596 this.behaviour.inject_expired_listen_addr(addr);
597 }
598 this.behaviour.inject_listener_closed(listener_id, match &reason {
599 Ok(()) => Ok(()),
600 Err(err) => Err(err),
601 });
602 return Poll::Ready(SwarmEvent::ListenerClosed {
603 addresses,
604 reason,
605 });
606 }
607 Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) => {
608 this.behaviour.inject_listener_error(listener_id, &error);
609 return Poll::Ready(SwarmEvent::ListenerError {
610 error,
611 });
612 },
613 Poll::Ready(NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => {
614 log::debug!("Incoming connection failed: {:?}", error);
615 return Poll::Ready(SwarmEvent::IncomingConnectionError {
616 local_addr,
617 send_back_addr,
618 error,
619 });
620 },
621 Poll::Ready(NetworkEvent::DialError { peer_id, multiaddr, error, attempts_remaining }) => {
622 log::debug!(
623 "Connection attempt to {:?} via {:?} failed with {:?}. Attempts remaining: {}.",
624 peer_id, multiaddr, error, attempts_remaining);
625 this.behaviour.inject_addr_reach_failure(Some(&peer_id), &multiaddr, &error);
626 if attempts_remaining == 0 {
627 this.behaviour.inject_dial_failure(&peer_id);
628 }
629 return Poll::Ready(SwarmEvent::UnreachableAddr {
630 peer_id,
631 address: multiaddr,
632 error,
633 attempts_remaining,
634 });
635 },
636 Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error, .. }) => {
637 log::debug!("Connection attempt to address {:?} of unknown peer failed with {:?}",
638 multiaddr, error);
639 this.behaviour.inject_addr_reach_failure(None, &multiaddr, &error);
640 return Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr {
641 address: multiaddr,
642 error,
643 });
644 },
645 }
646
647 if let Some((peer_id, handler, event)) = this.pending_event.take() {
653 if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
654 match handler {
655 PendingNotifyHandler::One(conn_id) =>
656 if let Some(mut conn) = peer.connection(conn_id) {
657 if let Some(event) = notify_one(&mut conn, event, cx) {
658 this.pending_event = Some((peer_id, handler, event));
659 return Poll::Pending
660 }
661 },
662 PendingNotifyHandler::Any(ids) => {
663 if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
664 let handler = PendingNotifyHandler::Any(ids);
665 this.pending_event = Some((peer_id, handler, event));
666 return Poll::Pending
667 }
668 }
669 }
670 }
671 }
672
673 debug_assert!(this.pending_event.is_none());
674
675 let behaviour_poll = {
676 let mut parameters = SwarmPollParameters {
677 local_peer_id: &mut this.network.local_peer_id(),
678 supported_protocols: &this.supported_protocols,
679 listened_addrs: &this.listened_addrs,
680 external_addrs: &this.external_addrs
681 };
682 this.behaviour.poll(cx, &mut parameters)
683 };
684
685 match behaviour_poll {
686 Poll::Pending if network_not_ready => return Poll::Pending,
687 Poll::Pending => (),
688 Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
689 return Poll::Ready(SwarmEvent::Behaviour(event))
690 },
691 Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
692 let _ = ExpandedSwarm::dial_addr(&mut *this, address);
693 },
694 Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
695 if this.banned_peers.contains(&peer_id) {
696 this.behaviour.inject_dial_failure(&peer_id);
697 } else {
698 let condition_matched = match condition {
699 DialPeerCondition::Disconnected => this.network.is_disconnected(&peer_id),
700 DialPeerCondition::NotDialing => !this.network.is_dialing(&peer_id),
701 DialPeerCondition::Always => true,
702 };
703 if condition_matched {
704 if ExpandedSwarm::dial(this, &peer_id).is_ok() {
705 return Poll::Ready(SwarmEvent::Dialing(peer_id))
706 }
707 } else {
708 log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
712 peer_id, condition);
713 let self_listening = &this.listened_addrs;
714 if let Some(mut peer) = this.network.peer(peer_id).into_dialing() {
715 let addrs = this.behaviour.addresses_of_peer(peer.id());
716 let mut attempt = peer.some_attempt();
717 for a in addrs {
718 if !self_listening.contains(&a) {
719 attempt.add_address(a);
720 }
721 }
722 }
723 }
724 }
725 },
726 Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
727 if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
728 match handler {
729 NotifyHandler::One(connection) => {
730 if let Some(mut conn) = peer.connection(connection) {
731 if let Some(event) = notify_one(&mut conn, event, cx) {
732 let handler = PendingNotifyHandler::One(connection);
733 this.pending_event = Some((peer_id, handler, event));
734 return Poll::Pending
735 }
736 }
737 }
738 NotifyHandler::Any => {
739 let ids = peer.connections().into_ids().collect();
740 if let Some((event, ids)) = notify_any(ids, &mut peer, event, cx) {
741 let handler = PendingNotifyHandler::Any(ids);
742 this.pending_event = Some((peer_id, handler, event));
743 return Poll::Pending
744 }
745 }
746 }
747 }
748 },
749 Poll::Ready(NetworkBehaviourAction::DisconnectPeer { peer_id }) => {
750 this.disconnect_peer(&peer_id);
751 },
752 Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
753 for addr in this.network.address_translation(&address) {
754 if this.external_addrs.iter().all(|a| a.addr != addr) {
755 this.behaviour.inject_new_external_addr(&addr);
756 }
757 this.external_addrs.add(addr, score);
758 }
759 },
760 }
761 }
762 }
763
764 pub fn disconnect_peer( &mut self, peer_id: &PeerId ) {
765 if let Some(mut peer) = self.network.peer(peer_id.clone()).into_connected() {
766 let mut con_iter = peer.connections();
767 loop {
768 if let Some(con) = con_iter.next() {
769 con.start_close();
770 }
771 else {
772 break;
773 }
774 }
775 }
776 }
777
778 pub fn get_behaviour(&mut self) -> &mut TBehaviour {
779 &mut self.behaviour
780 }
781
782}
783
784enum PendingNotifyHandler {
791 One(ConnectionId),
792 Any(SmallVec<[ConnectionId; 10]>),
793}
794
795fn notify_one<'a, TInEvent>(
804 conn: &mut EstablishedConnection<'a, TInEvent>,
805 event: TInEvent,
806 cx: &mut Context<'_>,
807) -> Option<TInEvent>
808{
809 match conn.poll_ready_notify_handler(cx) {
810 Poll::Pending => Some(event),
811 Poll::Ready(Err(())) => None, Poll::Ready(Ok(())) => {
813 let _ = conn.notify_handler(event);
815 None
816 }
817 }
818}
819
820fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
831 ids: SmallVec<[ConnectionId; 10]>,
832 peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
833 event: TInEvent,
834 cx: &mut Context<'_>,
835) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
836where
837 TTrans: Transport,
838 THandler: IntoConnectionHandler,
839{
840 let mut pending = SmallVec::new();
841 let mut event = Some(event); for id in ids.into_iter() {
843 if let Some(mut conn) = peer.connection(id) {
844 match conn.poll_ready_notify_handler(cx) {
845 Poll::Pending => pending.push(id),
846 Poll::Ready(Err(())) => {} Poll::Ready(Ok(())) => {
848 let e = event.take().expect("by (1),(2)");
849 if let Err(e) = conn.notify_handler(e) {
850 event = Some(e) } else {
852 break
853 }
854 }
855 }
856 }
857 }
858
859 event.and_then(|e|
860 if !pending.is_empty() {
861 Some((e, pending))
862 } else {
863 None
864 })
865}
866
867impl<TBehaviour, TInEvent, TOutEvent, THandler> Stream for
868 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
869where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
870 THandler: IntoProtocolsHandler + Send + 'static,
871 TInEvent: Send + 'static,
872 TOutEvent: Send + 'static,
873 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
874{
875 type Item = TBehaviour::OutEvent;
876
877 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
878 loop {
879 let event = futures::ready!(ExpandedSwarm::poll_next_event(self.as_mut(), cx));
880 if let SwarmEvent::Behaviour(event) = event {
881 return Poll::Ready(Some(event));
882 }
883 }
884 }
885}
886
887impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
889 ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
890where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
891 THandler: IntoProtocolsHandler + Send + 'static,
892 TInEvent: Send + 'static,
893 TOutEvent: Send + 'static,
894 THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
895{
896 fn is_terminated(&self) -> bool {
897 false
898 }
899}
900
901pub struct SwarmPollParameters<'a> {
904 local_peer_id: &'a PeerId,
905 supported_protocols: &'a [Vec<u8>],
906 listened_addrs: &'a [Multiaddr],
907 external_addrs: &'a Addresses,
908}
909
910impl<'a> PollParameters for SwarmPollParameters<'a> {
911 type SupportedProtocolsIter = std::vec::IntoIter<Vec<u8>>;
912 type ListenedAddressesIter = std::vec::IntoIter<Multiaddr>;
913 type ExternalAddressesIter = AddressIntoIter;
914
915 fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
916 self.supported_protocols.to_vec().into_iter()
917 }
918
919 fn listened_addresses(&self) -> Self::ListenedAddressesIter {
920 self.listened_addrs.to_vec().into_iter()
921 }
922
923 fn external_addresses(&self) -> Self::ExternalAddressesIter {
924 self.external_addrs.clone().into_iter()
925 }
926
927 fn local_peer_id(&self) -> &PeerId {
928 &self.local_peer_id
929 }
930}
931
932pub struct SwarmBuilder<TBehaviour> {
935 local_peer_id: PeerId,
936 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
937 behaviour: TBehaviour,
938 network_config: NetworkConfig,
939 substream_upgrade_protocol_override: Option<mwc_libp2p_core::upgrade::Version>,
940}
941
942impl<TBehaviour> SwarmBuilder<TBehaviour>
943where TBehaviour: NetworkBehaviour,
944{
945 pub fn new(
949 transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
950 behaviour: TBehaviour,
951 local_peer_id: PeerId
952 ) -> Self {
953 SwarmBuilder {
954 local_peer_id,
955 transport,
956 behaviour,
957 network_config: Default::default(),
958 substream_upgrade_protocol_override: None,
959 }
960 }
961
962 pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
967 self.network_config = self.network_config.with_executor(e);
968 self
969 }
970
971 pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
981 self.network_config = self.network_config.with_notify_handler_buffer_size(n);
982 self
983 }
984
985 pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
1009 self.network_config = self.network_config.with_connection_event_buffer_size(n);
1010 self
1011 }
1012
1013 pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
1015 self.network_config = self.network_config.with_connection_limits(limits);
1016 self
1017 }
1018
1019 pub fn substream_upgrade_protocol_override(mut self, v: mwc_libp2p_core::upgrade::Version) -> Self {
1030 self.substream_upgrade_protocol_override = Some(v);
1031 self
1032 }
1033
1034 pub fn build(mut self) -> Swarm<TBehaviour> {
1036 let supported_protocols = self.behaviour
1037 .new_handler()
1038 .inbound_protocol()
1039 .protocol_info()
1040 .into_iter()
1041 .map(|info| info.protocol_name().to_vec())
1042 .collect();
1043
1044 let network_cfg = self.network_config.or_else_with_executor(|| {
1046 match ThreadPoolBuilder::new()
1047 .name_prefix("mwc-libp2p-swarm-task-")
1048 .create()
1049 {
1050 Ok(tp) => {
1051 Some(Box::new(move |f| tp.spawn_ok(f)))
1052 },
1053 Err(err) => {
1054 log::warn!("Failed to create executor thread pool: {:?}", err);
1055 None
1056 }
1057 }
1058 });
1059
1060 let network = Network::new(self.transport, self.local_peer_id, network_cfg);
1061
1062 ExpandedSwarm {
1063 network,
1064 behaviour: self.behaviour,
1065 supported_protocols,
1066 listened_addrs: SmallVec::new(),
1067 external_addrs: Addresses::default(),
1068 banned_peers: HashSet::new(),
1069 pending_event: None,
1070 substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
1071 }
1072 }
1073}
1074
1075#[derive(Debug)]
1077pub enum DialError {
1078 Banned,
1080 ConnectionLimit(ConnectionLimit),
1083 NoAddresses
1086}
1087
1088impl fmt::Display for DialError {
1089 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090 match self {
1091 DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
1092 DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
1093 DialError::Banned => write!(f, "Dial error: peer is banned.")
1094 }
1095 }
1096}
1097
1098impl error::Error for DialError {
1099 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
1100 match self {
1101 DialError::ConnectionLimit(err) => Some(err),
1102 DialError::NoAddresses => None,
1103 DialError::Banned => None
1104 }
1105 }
1106}
1107
1108#[derive(Clone, Default)]
1110pub struct DummyBehaviour {
1111}
1112
1113impl NetworkBehaviour for DummyBehaviour {
1114 type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
1115 type OutEvent = void::Void;
1116
1117 fn new_handler(&mut self) -> Self::ProtocolsHandler {
1118 protocols_handler::DummyProtocolsHandler::default()
1119 }
1120
1121 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
1122 Vec::new()
1123 }
1124
1125 fn inject_connected(&mut self, _: &PeerId) {}
1126
1127 fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1128
1129 fn inject_disconnected(&mut self, _: &PeerId) {}
1130
1131 fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
1132
1133 fn inject_event(&mut self, _: PeerId, _: ConnectionId,
1134 _: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
1135
1136 fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) ->
1137 Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
1138 ProtocolsHandler>::InEvent, Self::OutEvent>>
1139 {
1140 Poll::Pending
1141 }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use crate::protocols_handler::DummyProtocolsHandler;
1147 use crate::test::{MockBehaviour, CallTraceBehaviour};
1148 use futures::{future, executor};
1149 use mwc_libp2p_core::{
1150 identity,
1151 upgrade,
1152 multiaddr,
1153 transport
1154 };
1155 use mwc_libp2p_noise as noise;
1156 use super::*;
1157
1158 fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
1159 where
1160 T: ProtocolsHandler + Clone,
1161 T::OutEvent: Clone,
1162 O: Send + 'static
1163 {
1164 let id_keys = identity::Keypair::generate_ed25519();
1165 let pubkey = id_keys.public();
1166 let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
1167 let transport = transport::MemoryTransport::default()
1168 .upgrade(upgrade::Version::V1)
1169 .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
1170 .multiplex(mwc_libp2p_mplex::MplexConfig::new())
1171 .boxed();
1172 let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
1173 SwarmBuilder::new(transport, behaviour, PeerId::from_public_key(pubkey)).build()
1174 }
1175
1176 #[test]
1183 fn test_connect_disconnect_ban() {
1184 let mut handler_proto = DummyProtocolsHandler::default();
1187 handler_proto.keep_alive = KeepAlive::Yes;
1188
1189 let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
1190 let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
1191
1192 let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1193 let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
1194
1195 Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
1196 Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
1197
1198 enum State {
1200 Connecting,
1201 Disconnecting,
1202 }
1203
1204 let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
1205
1206 let mut banned = false;
1207 let mut unbanned = false;
1208
1209 let num_connections = 10;
1210
1211 for _ in 0 .. num_connections {
1212 Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
1213 }
1214 let mut state = State::Connecting;
1215
1216 executor::block_on(future::poll_fn(move |cx| {
1217 loop {
1218 let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
1219 let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
1220 match state {
1221 State::Connecting => {
1222 for s in &[&swarm1, &swarm2] {
1223 if s.behaviour.inject_connection_established.len() > 0 {
1224 assert_eq!(s.behaviour.inject_connected.len(), 1);
1225 } else {
1226 assert_eq!(s.behaviour.inject_connected.len(), 0);
1227 }
1228 assert!(s.behaviour.inject_connection_closed.len() == 0);
1229 assert!(s.behaviour.inject_disconnected.len() == 0);
1230 }
1231 if [&swarm1, &swarm2].iter().all(|s| {
1232 s.behaviour.inject_connection_established.len() == num_connections
1233 }) {
1234 if banned {
1235 return Poll::Ready(())
1236 }
1237 Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
1238 swarm1.behaviour.reset();
1239 swarm2.behaviour.reset();
1240 banned = true;
1241 state = State::Disconnecting;
1242 }
1243 }
1244 State::Disconnecting => {
1245 for s in &[&swarm1, &swarm2] {
1246 if s.behaviour.inject_connection_closed.len() < num_connections {
1247 assert_eq!(s.behaviour.inject_disconnected.len(), 0);
1248 } else {
1249 assert_eq!(s.behaviour.inject_disconnected.len(), 1);
1250 }
1251 assert_eq!(s.behaviour.inject_connection_established.len(), 0);
1252 assert_eq!(s.behaviour.inject_connected.len(), 0);
1253 }
1254 if [&swarm1, &swarm2].iter().all(|s| {
1255 s.behaviour.inject_connection_closed.len() == num_connections
1256 }) {
1257 if unbanned {
1258 return Poll::Ready(())
1259 }
1260 Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
1262 swarm1.behaviour.reset();
1263 swarm2.behaviour.reset();
1264 unbanned = true;
1265 for _ in 0 .. num_connections {
1266 Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
1267 }
1268 state = State::Connecting;
1269 }
1270 }
1271 }
1272
1273 if poll1.is_pending() && poll2.is_pending() {
1274 return Poll::Pending
1275 }
1276 }
1277 }))
1278 }
1279}