1use std::{
22 cmp::{
23 max,
24 Ordering::{self, Equal},
25 },
26 collections::{BTreeSet, HashMap, HashSet, VecDeque},
27 fmt::{self, Debug},
28 net::IpAddr,
29 task::{Context, Poll},
30 time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37 multiaddr::Protocol::{Ip4, Ip6},
38 transport::PortUse,
39 Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44 dial_opts::DialOpts,
45 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46 THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57 backoff::BackoffStorage,
58 config::{Config, ValidationMode},
59 gossip_promises::GossipPromises,
60 handler::{Handler, HandlerEvent, HandlerIn},
61 mcache::MessageCache,
62 peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63 protocol::SIGNING_PREFIX,
64 rpc::Sender,
65 rpc_proto::proto,
66 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67 time_cache::DuplicateCache,
68 topic::{Hasher, Topic, TopicHash},
69 transform::{DataTransform, IdentityTransform},
70 types::{
71 ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72 PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73 SubscriptionAction,
74 },
75 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81const IDONTWANT_CAP: usize = 10_000;
83
84const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87#[derive(Clone)]
94pub enum MessageAuthenticity {
95 Signed(Keypair),
98 Author(PeerId),
103 RandomAuthor,
108 Anonymous,
118}
119
120impl MessageAuthenticity {
121 pub fn is_signing(&self) -> bool {
123 matches!(self, MessageAuthenticity::Signed(_))
124 }
125
126 pub fn is_anonymous(&self) -> bool {
127 matches!(self, MessageAuthenticity::Anonymous)
128 }
129}
130
131#[derive(Debug)]
133pub enum Event {
134 Message {
136 propagation_source: PeerId,
138 message_id: MessageId,
141 message: Message,
143 },
144 Subscribed {
146 peer_id: PeerId,
148 topic: TopicHash,
150 },
151 Unsubscribed {
153 peer_id: PeerId,
155 topic: TopicHash,
157 },
158 GossipsubNotSupported { peer_id: PeerId },
160 SlowPeer {
162 peer_id: PeerId,
164 failed_messages: FailedMessages,
166 },
167}
168
169#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173 Signing {
174 keypair: Keypair,
175 author: PeerId,
176 inline_key: Option<Vec<u8>>,
177 last_seq_no: SequenceNumber,
178 },
179 Author(PeerId),
180 RandomAuthor,
181 Anonymous,
182}
183
184#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191 fn new() -> Self {
192 let unix_timestamp = SystemTime::now()
193 .duration_since(SystemTime::UNIX_EPOCH)
194 .expect("time to be linear")
195 .as_nanos();
196
197 Self(unix_timestamp as u64)
198 }
199
200 fn next(&mut self) -> u64 {
201 self.0 = self
202 .0
203 .checked_add(1)
204 .expect("to not exhaust u64 space for sequence numbers");
205
206 self.0
207 }
208}
209
210impl PublishConfig {
211 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212 match self {
213 Self::Signing { author, .. } => Some(author),
214 Self::Author(author) => Some(author),
215 _ => None,
216 }
217 }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221 fn from(authenticity: MessageAuthenticity) -> Self {
222 match authenticity {
223 MessageAuthenticity::Signed(keypair) => {
224 let public_key = keypair.public();
225 let key_enc = public_key.encode_protobuf();
226 let key = if key_enc.len() <= 42 {
227 None
231 } else {
232 Some(key_enc)
234 };
235
236 PublishConfig::Signing {
237 keypair,
238 author: public_key.to_peer_id(),
239 inline_key: key,
240 last_seq_no: SequenceNumber::new(),
241 }
242 }
243 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246 }
247 }
248}
249
250pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262 config: Config,
264
265 events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268 publish_config: PublishConfig,
270
271 duplicate_cache: DuplicateCache<MessageId>,
274
275 connected_peers: HashMap<PeerId, PeerDetails>,
278
279 explicit_peers: HashSet<PeerId>,
282
283 blacklisted_peers: HashSet<PeerId>,
286
287 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293 fanout_last_pub: HashMap<TopicHash, Instant>,
295
296 backoffs: BackoffStorage,
298
299 mcache: MessageCache,
301
302 heartbeat: Delay,
304
305 heartbeat_ticks: u64,
308
309 px_peers: HashSet<PeerId>,
314
315 peer_score: PeerScoreState,
318
319 count_received_ihave: HashMap<PeerId, usize>,
321
322 count_sent_iwant: HashMap<PeerId, usize>,
324
325 published_message_ids: DuplicateCache<MessageId>,
328
329 subscription_filter: F,
331
332 data_transform: D,
336
337 #[cfg(feature = "metrics")]
339 metrics: Option<Metrics>,
340
341 failed_messages: HashMap<PeerId, FailedMessages>,
343
344 gossip_promises: GossipPromises,
346}
347
348impl<D, F> Behaviour<D, F>
349where
350 D: DataTransform + Default,
351 F: TopicSubscriptionFilter + Default,
352{
353 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
356 Self::new_with_subscription_filter_and_transform(
357 privacy,
358 config,
359 F::default(),
360 D::default(),
361 )
362 }
363}
364
365impl<D, F> Behaviour<D, F>
366where
367 D: DataTransform + Default,
368 F: TopicSubscriptionFilter,
369{
370 pub fn new_with_subscription_filter(
373 privacy: MessageAuthenticity,
374 config: Config,
375 subscription_filter: F,
376 ) -> Result<Self, &'static str> {
377 Self::new_with_subscription_filter_and_transform(
378 privacy,
379 config,
380 subscription_filter,
381 D::default(),
382 )
383 }
384}
385
386impl<D, F> Behaviour<D, F>
387where
388 D: DataTransform,
389 F: TopicSubscriptionFilter + Default,
390{
391 pub fn new_with_transform(
395 privacy: MessageAuthenticity,
396 config: Config,
397 data_transform: D,
398 ) -> Result<Self, &'static str> {
399 Self::new_with_subscription_filter_and_transform(
400 privacy,
401 config,
402 F::default(),
403 data_transform,
404 )
405 }
406}
407
408impl<D, F> Behaviour<D, F>
409where
410 D: DataTransform,
411 F: TopicSubscriptionFilter,
412{
413 pub fn new_with_subscription_filter_and_transform(
417 privacy: MessageAuthenticity,
418 config: Config,
419 subscription_filter: F,
420 data_transform: D,
421 ) -> Result<Self, &'static str> {
422 validate_config(&privacy, config.validation_mode())?;
427
428 Ok(Behaviour {
429 #[cfg(feature = "metrics")]
430 metrics: None,
431 events: VecDeque::new(),
432 publish_config: privacy.into(),
433 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
434 explicit_peers: HashSet::new(),
435 blacklisted_peers: HashSet::new(),
436 mesh: HashMap::new(),
437 fanout: HashMap::new(),
438 fanout_last_pub: HashMap::new(),
439 backoffs: BackoffStorage::new(
440 &config.prune_backoff(),
441 config.heartbeat_interval(),
442 config.backoff_slack(),
443 ),
444 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
445 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
446 heartbeat_ticks: 0,
447 px_peers: HashSet::new(),
448 peer_score: PeerScoreState::Disabled,
449 count_received_ihave: HashMap::new(),
450 count_sent_iwant: HashMap::new(),
451 connected_peers: HashMap::new(),
452 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
453 config,
454 subscription_filter,
455 data_transform,
456 failed_messages: Default::default(),
457 gossip_promises: Default::default(),
458 })
459 }
460
461 #[cfg(feature = "metrics")]
464 pub fn with_metrics(
465 mut self,
466 metrics_registry: &mut Registry,
467 metrics_config: MetricsConfig,
468 ) -> Self {
469 self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
470 self
471 }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476 D: DataTransform + Send + 'static,
477 F: TopicSubscriptionFilter + Send + 'static,
478{
479 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481 self.mesh.keys()
482 }
483
484 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487 }
488
489 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490 let mut res = BTreeSet::new();
491 for peers in self.mesh.values() {
492 res.extend(peers);
493 }
494 res.into_iter()
495 }
496
497 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499 self.connected_peers
500 .iter()
501 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502 }
503
504 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507 }
508
509 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511 match &self.peer_score {
512 PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
513 PeerScoreState::Disabled => None,
514 }
515 }
516
517 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
522 let topic_hash = topic.hash();
523 if !self.subscription_filter.can_subscribe(&topic_hash) {
524 return Err(SubscriptionError::NotAllowed);
525 }
526
527 if self.mesh.contains_key(&topic_hash) {
528 tracing::debug!(%topic, "Topic is already in the mesh");
529 return Ok(false);
530 }
531
532 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
534 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
535 let event = RpcOut::Subscribe(topic_hash.clone());
536 self.send_message(peer_id, event);
537 }
538
539 self.join(&topic_hash);
542 tracing::debug!(%topic, "Subscribed to topic");
543 Ok(true)
544 }
545
546 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
550 let topic_hash = topic.hash();
551
552 if !self.mesh.contains_key(&topic_hash) {
553 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
554 return false;
556 }
557
558 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
560 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
561 let event = RpcOut::Unsubscribe(topic_hash.clone());
562 self.send_message(peer, event);
563 }
564
565 self.leave(&topic_hash);
568
569 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
570 true
571 }
572
573 pub fn publish(
575 &mut self,
576 topic: impl Into<TopicHash>,
577 data: impl Into<Vec<u8>>,
578 ) -> Result<MessageId, PublishError> {
579 let data = data.into();
580 let topic = topic.into();
581
582 let transformed_data = self
584 .data_transform
585 .outbound_transform(&topic.clone(), data.clone())?;
586
587 let max_transmit_size_for_topic = self
588 .config
589 .protocol_config()
590 .max_transmit_size_for_topic(&topic);
591
592 if transformed_data.len() > max_transmit_size_for_topic {
594 return Err(PublishError::MessageTooLarge);
595 }
596
597 let mesh_n = self.config.mesh_n_for_topic(&topic);
598 let raw_message = self.build_raw_message(topic, transformed_data)?;
599
600 let msg_id = self.config.message_id(&Message {
602 source: raw_message.source,
603 data, sequence_number: raw_message.sequence_number,
605 topic: raw_message.topic.clone(),
606 });
607
608 if self.duplicate_cache.contains(&msg_id) {
610 tracing::warn!(
613 message_id=%msg_id,
614 "Not publishing a message that has already been published"
615 );
616 return Err(PublishError::Duplicate);
617 }
618
619 tracing::trace!(message_id=%msg_id, "Publishing message");
620
621 let topic_hash = raw_message.topic.clone();
622
623 let mut peers_on_topic = self
624 .connected_peers
625 .iter()
626 .filter(|(_, p)| p.topics.contains(&topic_hash))
627 .map(|(peer_id, _)| peer_id)
628 .peekable();
629
630 if peers_on_topic.peek().is_none() {
631 return Err(PublishError::NoPeersSubscribedToTopic);
632 }
633
634 let mut recipient_peers = HashSet::new();
635 if self.config.flood_publish() {
636 recipient_peers.extend(peers_on_topic.filter(|p| {
638 self.explicit_peers.contains(*p)
639 || !self
640 .peer_score
641 .below_threshold(p, |ts| ts.publish_threshold)
642 .0
643 }));
644 } else {
645 match self.mesh.get(&topic_hash) {
646 Some(mesh_peers) => {
648 let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
651
652 if needed_extra_peers > 0 {
653 let peer_list = get_random_peers(
658 &self.connected_peers,
659 &topic_hash,
660 needed_extra_peers,
661 |peer| {
662 !mesh_peers.contains(peer)
663 && !self.explicit_peers.contains(peer)
664 && !self
665 .peer_score
666 .below_threshold(peer, |ts| ts.publish_threshold)
667 .0
668 },
669 );
670 recipient_peers.extend(peer_list);
671 }
672
673 recipient_peers.extend(mesh_peers);
674 }
675 None => {
677 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678 let fanout_peers = self
680 .fanout
681 .get(&topic_hash)
682 .filter(|peers| !peers.is_empty());
683 if let Some(peers) = fanout_peers {
685 for peer in peers {
686 recipient_peers.insert(*peer);
687 }
688 } else {
689 let new_peers =
691 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
692 |p| {
693 !self.explicit_peers.contains(p)
694 && !self
695 .peer_score
696 .below_threshold(p, |ts| ts.publish_threshold)
697 .0
698 }
699 });
700 self.fanout.insert(topic_hash.clone(), new_peers.clone());
702 for peer in new_peers {
703 tracing::debug!(%peer, "Peer added to fanout");
704 recipient_peers.insert(peer);
705 }
706 }
707 self.fanout_last_pub
709 .insert(topic_hash.clone(), Instant::now());
710 }
711 }
712
713 recipient_peers
715 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717 for (peer, connections) in &self.connected_peers {
719 if connections.kind == PeerKind::Floodsub
720 && connections.topics.contains(&topic_hash)
721 && !self
722 .peer_score
723 .below_threshold(peer, |ts| ts.publish_threshold)
724 .0
725 {
726 recipient_peers.insert(*peer);
727 }
728 }
729 }
730
731 self.duplicate_cache.insert(msg_id.clone());
734 self.mcache.put(&msg_id, raw_message.clone());
735
736 self.gossip_promises.message_delivered(&msg_id);
738
739 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
742 if !self.config.allow_self_origin() {
743 self.published_message_ids.insert(msg_id.clone());
744 }
745 }
746
747 let mut publish_failed = true;
749 for peer_id in recipient_peers.iter() {
750 tracing::trace!(peer=%peer_id, "Sending message to peer");
751 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
754 && self.config.idontwant_on_publish()
755 {
756 self.send_message(
757 *peer_id,
758 RpcOut::IDontWant(IDontWant {
759 message_ids: vec![msg_id.clone()],
760 }),
761 );
762 }
763
764 if self.send_message(
765 *peer_id,
766 RpcOut::Publish {
767 message: raw_message.clone(),
768 timeout: Delay::new(self.config.publish_queue_duration()),
769 },
770 ) {
771 publish_failed = false
772 }
773 }
774
775 if recipient_peers.is_empty() {
776 return Err(PublishError::NoPeersSubscribedToTopic);
777 }
778
779 if publish_failed {
780 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
781 }
782
783 tracing::debug!(message_id=%msg_id, "Published message");
784
785 #[cfg(feature = "metrics")]
786 if let Some(metrics) = self.metrics.as_mut() {
787 metrics.register_published_message(&topic_hash);
788 }
789
790 Ok(msg_id)
791 }
792
793 pub fn report_message_validation_result(
813 &mut self,
814 msg_id: &MessageId,
815 propagation_source: &PeerId,
816 acceptance: MessageAcceptance,
817 ) -> bool {
818 let reject_reason = match acceptance {
819 MessageAcceptance::Accept => {
820 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
821 Some((raw_message, originating_peers)) => {
822 (raw_message.clone(), originating_peers)
823 }
824 None => {
825 tracing::warn!(
826 message_id=%msg_id,
827 "Message not in cache. Ignoring forwarding"
828 );
829 #[cfg(feature = "metrics")]
830 if let Some(metrics) = self.metrics.as_mut() {
831 metrics.memcache_miss();
832 }
833 return false;
834 }
835 };
836
837 #[cfg(feature = "metrics")]
838 if let Some(metrics) = self.metrics.as_mut() {
839 metrics.register_msg_validation(&raw_message.topic, &acceptance);
840 }
841
842 self.forward_msg(
843 msg_id,
844 raw_message,
845 Some(propagation_source),
846 originating_peers,
847 );
848 return true;
849 }
850 MessageAcceptance::Reject => RejectReason::ValidationFailed,
851 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
852 };
853
854 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
855 #[cfg(feature = "metrics")]
856 if let Some(metrics) = self.metrics.as_mut() {
857 metrics.register_msg_validation(&raw_message.topic, &acceptance);
858 }
859
860 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
863 peer_score.reject_message(
864 propagation_source,
865 msg_id,
866 &raw_message.topic,
867 reject_reason,
868 );
869 for peer in originating_peers.iter() {
870 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
871 }
872 }
873 true
874 } else {
875 tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
876 false
877 }
878 }
879
880 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
882 tracing::debug!(peer=%peer_id, "Adding explicit peer");
883
884 self.explicit_peers.insert(*peer_id);
885
886 self.check_explicit_peer_connection(peer_id);
887 }
888
889 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
892 tracing::debug!(peer=%peer_id, "Removing explicit peer");
893 self.explicit_peers.remove(peer_id);
894 }
895
896 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
899 if self.blacklisted_peers.insert(*peer_id) {
900 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
901 }
902 }
903
904 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
906 if self.blacklisted_peers.remove(peer_id) {
907 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
908 }
909 }
910
911 pub fn with_peer_score(
915 &mut self,
916 params: PeerScoreParams,
917 threshold: PeerScoreThresholds,
918 ) -> Result<(), String> {
919 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
920 }
921
922 pub fn with_peer_score_and_message_delivery_time_callback(
925 &mut self,
926 params: PeerScoreParams,
927 thresholds: PeerScoreThresholds,
928 callback: Option<fn(&PeerId, &TopicHash, f64)>,
929 ) -> Result<(), String> {
930 params.validate()?;
931 thresholds.validate()?;
932
933 if let PeerScoreState::Active(_) = self.peer_score {
934 return Err("Peer score set twice".into());
935 }
936
937 let peer_score =
938 PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
939 self.peer_score = PeerScoreState::Active(Box::new(peer_score));
940 Ok(())
941 }
942
943 pub fn set_topic_params<H: Hasher>(
947 &mut self,
948 topic: Topic<H>,
949 params: TopicScoreParams,
950 ) -> Result<(), &'static str> {
951 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
952 peer_score.set_topic_params(topic.hash(), params);
953 Ok(())
954 } else {
955 Err("Peer score must be initialised with `with_peer_score()`")
956 }
957 }
958
959 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
961 match &self.peer_score {
962 PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
963 PeerScoreState::Disabled => None,
964 }
965 }
966
967 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
970 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
971 peer_score.set_application_score(peer_id, new_score)
972 } else {
973 false
974 }
975 }
976
977 fn join(&mut self, topic_hash: &TopicHash) {
979 let mut added_peers = HashSet::new();
980 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
981 #[cfg(feature = "metrics")]
982 if let Some(m) = self.metrics.as_mut() {
983 m.joined(topic_hash)
984 }
985
986 self.mesh.entry(topic_hash.clone()).or_default();
988
989 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
992 tracing::debug!(
993 topic=%topic_hash,
994 "JOIN: Removing peers from the fanout for topic"
995 );
996
997 peers.retain(|p| {
999 !self.explicit_peers.contains(p)
1000 && !self.peer_score.below_threshold(p, |_| 0.0).0
1001 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
1002 });
1003
1004 let add_peers = std::cmp::min(peers.len(), mesh_n);
1007 tracing::debug!(
1008 topic=%topic_hash,
1009 "JOIN: Adding {:?} peers from the fanout for topic",
1010 add_peers
1011 );
1012 added_peers.extend(peers.iter().take(add_peers));
1013
1014 self.mesh.insert(
1015 topic_hash.clone(),
1016 peers.into_iter().take(add_peers).collect(),
1017 );
1018
1019 self.fanout_last_pub.remove(topic_hash);
1021 }
1022
1023 #[cfg(feature = "metrics")]
1024 if let Some(m) = self.metrics.as_mut() {
1025 m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1026 }
1027
1028 if added_peers.len() < mesh_n {
1030 let random_added = get_random_peers(
1032 &self.connected_peers,
1033 topic_hash,
1034 mesh_n - added_peers.len(),
1035 |peer| {
1036 !added_peers.contains(peer)
1037 && !self.explicit_peers.contains(peer)
1038 && !self.peer_score.below_threshold(peer, |_| 0.0).0
1039 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1040 },
1041 );
1042
1043 added_peers.extend(random_added.clone());
1044 tracing::debug!(
1046 "JOIN: Inserting {:?} random peers into the mesh",
1047 random_added.len()
1048 );
1049
1050 #[cfg(feature = "metrics")]
1051 if let Some(m) = self.metrics.as_mut() {
1052 m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1053 }
1054
1055 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1056 mesh_peers.extend(random_added);
1057 }
1058
1059 for peer_id in added_peers {
1060 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1062 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1063 peer_score.graft(&peer_id, topic_hash.clone());
1064 }
1065 self.send_message(
1066 peer_id,
1067 RpcOut::Graft(Graft {
1068 topic_hash: topic_hash.clone(),
1069 }),
1070 );
1071
1072 peer_added_to_mesh(
1074 peer_id,
1075 vec![topic_hash],
1076 &self.mesh,
1077 &mut self.events,
1078 &self.connected_peers,
1079 );
1080 }
1081
1082 #[cfg(feature = "metrics")]
1083 {
1084 let mesh_peers = self.mesh_peers(topic_hash).count();
1085 if let Some(m) = self.metrics.as_mut() {
1086 m.set_mesh_peers(topic_hash, mesh_peers)
1087 }
1088 }
1089
1090 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1091 }
1092
1093 fn make_prune(
1095 &mut self,
1096 topic_hash: &TopicHash,
1097 peer: &PeerId,
1098 do_px: bool,
1099 on_unsubscribe: bool,
1100 ) -> Prune {
1101 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1102 peer_score.prune(peer, topic_hash.clone());
1103 }
1104
1105 match self.connected_peers.get(peer).map(|v| &v.kind) {
1106 Some(PeerKind::Floodsub) => {
1107 tracing::error!("Attempted to prune a Floodsub peer");
1108 }
1109 Some(PeerKind::Gossipsub) => {
1110 return Prune {
1112 topic_hash: topic_hash.clone(),
1113 peers: Vec::new(),
1114 backoff: None,
1115 };
1116 }
1117 None => {
1118 tracing::error!("Attempted to Prune an unknown peer");
1119 }
1120 _ => {} }
1122
1123 let peers = if do_px {
1125 get_random_peers(
1126 &self.connected_peers,
1127 topic_hash,
1128 self.config.prune_peers(),
1129 |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1130 )
1131 .into_iter()
1132 .map(|p| PeerInfo { peer_id: Some(p) })
1133 .collect()
1134 } else {
1135 Vec::new()
1136 };
1137
1138 let backoff = if on_unsubscribe {
1139 self.config.unsubscribe_backoff()
1140 } else {
1141 self.config.prune_backoff()
1142 };
1143
1144 self.backoffs.update_backoff(topic_hash, peer, backoff);
1146
1147 Prune {
1148 topic_hash: topic_hash.clone(),
1149 peers,
1150 backoff: Some(backoff.as_secs()),
1151 }
1152 }
1153
1154 fn leave(&mut self, topic_hash: &TopicHash) {
1156 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1157
1158 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1160 #[cfg(feature = "metrics")]
1161 if let Some(m) = self.metrics.as_mut() {
1162 m.left(topic_hash)
1163 }
1164 for peer_id in peers {
1165 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1167
1168 let on_unsubscribe = true;
1169 let prune =
1170 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1171 self.send_message(peer_id, RpcOut::Prune(prune));
1172
1173 peer_removed_from_mesh(
1175 peer_id,
1176 topic_hash,
1177 &self.mesh,
1178 &mut self.events,
1179 &self.connected_peers,
1180 );
1181 }
1182 }
1183 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1184 }
1185
1186 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1188 if !self.connected_peers.contains_key(peer_id) {
1189 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1191 self.events.push_back(ToSwarm::Dial {
1192 opts: DialOpts::peer_id(*peer_id).build(),
1193 });
1194 }
1195 }
1196
1197 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1200 if let (true, score) = self
1202 .peer_score
1203 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1204 {
1205 tracing::debug!(
1206 peer=%peer_id,
1207 %score,
1208 "IHAVE: ignoring peer with score below threshold"
1209 );
1210 return;
1211 }
1212
1213 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1215 *peer_have += 1;
1216 if *peer_have > self.config.max_ihave_messages() {
1217 tracing::debug!(
1218 peer=%peer_id,
1219 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1220 interval; ignoring",
1221 *peer_have
1222 );
1223 return;
1224 }
1225
1226 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1227 if *iasked >= self.config.max_ihave_length() {
1228 tracing::debug!(
1229 peer=%peer_id,
1230 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1231 *iasked
1232 );
1233 return;
1234 }
1235 }
1236
1237 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1238
1239 let mut iwant_ids = HashSet::new();
1240
1241 for (topic, ids) in ihave_msgs {
1242 if !self.mesh.contains_key(&topic) {
1244 tracing::debug!(
1245 %topic,
1246 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1247 );
1248 continue;
1249 }
1250
1251 for id in ids.into_iter().filter(|id| {
1252 if self.duplicate_cache.contains(id) {
1253 return false;
1254 }
1255
1256 !self.gossip_promises.contains(id)
1257 }) {
1258 if iwant_ids.insert(id) {
1260 #[cfg(feature = "metrics")]
1262 if let Some(metrics) = self.metrics.as_mut() {
1263 metrics.register_iwant(&topic);
1264 }
1265 }
1266 }
1267 }
1268
1269 if !iwant_ids.is_empty() {
1270 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1271 let mut iask = iwant_ids.len();
1272 if *iasked + iask > self.config.max_ihave_length() {
1273 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1274 }
1275
1276 tracing::debug!(
1278 peer=%peer_id,
1279 "IHAVE: Asking for {} out of {} messages from peer",
1280 iask,
1281 iwant_ids.len()
1282 );
1283
1284 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1286 let mut rng = thread_rng();
1287 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1288
1289 iwant_ids_vec.truncate(iask);
1290 *iasked += iask;
1291
1292 self.gossip_promises.add_promise(
1293 *peer_id,
1294 &iwant_ids_vec,
1295 Instant::now() + self.config.iwant_followup_time(),
1296 );
1297 tracing::trace!(
1298 peer=%peer_id,
1299 "IHAVE: Asking for the following messages from peer: {:?}",
1300 iwant_ids_vec
1301 );
1302
1303 self.send_message(
1304 *peer_id,
1305 RpcOut::IWant(IWant {
1306 message_ids: iwant_ids_vec,
1307 }),
1308 );
1309 }
1310 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1311 }
1312
1313 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1316 if let (true, score) = self
1318 .peer_score
1319 .below_threshold(peer_id, |ts| ts.gossip_threshold)
1320 {
1321 tracing::debug!(
1322 peer=%peer_id,
1323 "IWANT: ignoring peer with score below threshold [score = {}]",
1324 score
1325 );
1326 return;
1327 }
1328
1329 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1330
1331 for id in iwant_msgs {
1332 if let Some((msg, count)) = self
1335 .mcache
1336 .get_with_iwant_counts(&id, peer_id)
1337 .map(|(msg, count)| (msg.clone(), count))
1338 {
1339 if count > self.config.gossip_retransimission() {
1340 tracing::debug!(
1341 peer=%peer_id,
1342 message_id=%id,
1343 "IWANT: Peer has asked for message too many times; ignoring request"
1344 );
1345 } else {
1346 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1347 if peer.dont_send.contains_key(&id) {
1348 tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1349 continue;
1350 }
1351 }
1352
1353 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1354 self.send_message(
1355 *peer_id,
1356 RpcOut::Forward {
1357 message: msg,
1358 timeout: Delay::new(self.config.forward_queue_duration()),
1359 },
1360 );
1361 }
1362 }
1363 }
1364 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1365 }
1366
1367 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1370 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1371
1372 let mut to_prune_topics = HashSet::new();
1373
1374 let mut do_px = self.config.do_px();
1375
1376 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1377 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1378 return;
1379 };
1380 let is_outbound = connected_peer.outbound;
1382
1383 for topic in &topics {
1386 if connected_peer.topics.insert(topic.clone()) {
1387 #[cfg(feature = "metrics")]
1388 if let Some(m) = self.metrics.as_mut() {
1389 m.inc_topic_peers(topic);
1390 }
1391 }
1392 }
1393
1394 if self.explicit_peers.contains(peer_id) {
1396 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1397 to_prune_topics = topics.into_iter().collect();
1399 do_px = false
1401 } else {
1402 let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1403 let now = Instant::now();
1404 for topic_hash in topics {
1405 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1406 if peers.contains(peer_id) {
1408 tracing::debug!(
1409 peer=%peer_id,
1410 topic=%&topic_hash,
1411 "GRAFT: Received graft for peer that is already in topic"
1412 );
1413 continue;
1414 }
1415
1416 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1418 {
1419 if backoff_time > now {
1420 tracing::warn!(
1421 peer=%peer_id,
1422 "[Penalty] Peer attempted graft within backoff time, penalizing"
1423 );
1424 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1426 #[cfg(feature = "metrics")]
1427 if let Some(metrics) = self.metrics.as_mut() {
1428 metrics.register_score_penalty(Penalty::GraftBackoff);
1429 }
1430 peer_score.add_penalty(peer_id, 1);
1431
1432 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1435 let flood_cutoff = (backoff_time
1436 + self.config.graft_flood_threshold())
1437 - self.config.prune_backoff();
1438 if flood_cutoff > now {
1439 peer_score.add_penalty(peer_id, 1);
1441 }
1442 }
1443 do_px = false;
1445
1446 to_prune_topics.insert(topic_hash.clone());
1447 continue;
1448 }
1449 }
1450
1451 if below_zero {
1453 tracing::debug!(
1455 peer=%peer_id,
1456 %score,
1457 topic=%topic_hash,
1458 "GRAFT: ignoring peer with negative score"
1459 );
1460 to_prune_topics.insert(topic_hash.clone());
1463 do_px = false;
1465 continue;
1466 }
1467
1468 let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1471
1472 if peers.len() >= mesh_n_high && !is_outbound {
1473 to_prune_topics.insert(topic_hash.clone());
1474 continue;
1475 }
1476
1477 tracing::debug!(
1479 peer=%peer_id,
1480 topic=%topic_hash,
1481 "GRAFT: Mesh link added for peer in topic"
1482 );
1483
1484 if peers.insert(*peer_id) {
1485 #[cfg(feature = "metrics")]
1486 if let Some(m) = self.metrics.as_mut() {
1487 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1488 }
1489 }
1490
1491 peer_added_to_mesh(
1493 *peer_id,
1494 vec![&topic_hash],
1495 &self.mesh,
1496 &mut self.events,
1497 &self.connected_peers,
1498 );
1499
1500 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1501 peer_score.graft(peer_id, topic_hash);
1502 }
1503 } else {
1504 do_px = false;
1506 tracing::debug!(
1507 peer=%peer_id,
1508 topic=%topic_hash,
1509 "GRAFT: Received graft for unknown topic from peer"
1510 );
1511 continue;
1513 }
1514 }
1515 }
1516
1517 if !to_prune_topics.is_empty() {
1518 let on_unsubscribe = false;
1520
1521 for prune in to_prune_topics
1522 .iter()
1523 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1524 .collect::<Vec<_>>()
1525 {
1526 self.send_message(*peer_id, RpcOut::Prune(prune));
1527 }
1528 tracing::debug!(
1530 peer=%peer_id,
1531 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1532 );
1533 }
1534 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1535 }
1536
1537 fn remove_peer_from_mesh(
1539 &mut self,
1540 peer_id: &PeerId,
1541 topic_hash: &TopicHash,
1542 backoff: Option<u64>,
1543 always_update_backoff: bool,
1544 ) -> bool {
1545 let mut peer_removed = false;
1546 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1547 peer_removed = peers.remove(peer_id);
1549 if peer_removed {
1550 tracing::debug!(
1551 peer=%peer_id,
1552 topic=%topic_hash,
1553 "PRUNE: Removing peer from the mesh for topic"
1554 );
1555
1556 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1557 peer_score.prune(peer_id, topic_hash.clone());
1558 }
1559
1560 peer_removed_from_mesh(
1562 *peer_id,
1563 topic_hash,
1564 &self.mesh,
1565 &mut self.events,
1566 &self.connected_peers,
1567 );
1568 }
1569 }
1570 if always_update_backoff || peer_removed {
1571 let time = if let Some(backoff) = backoff {
1572 Duration::from_secs(backoff)
1573 } else {
1574 self.config.prune_backoff()
1575 };
1576 self.backoffs.update_backoff(topic_hash, peer_id, time);
1578 }
1579 peer_removed
1580 }
1581
1582 fn handle_prune(
1584 &mut self,
1585 peer_id: &PeerId,
1586 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1587 ) {
1588 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1589 let (below_threshold, score) = self
1590 .peer_score
1591 .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1592 for (topic_hash, px, backoff) in prune_data {
1593 if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1594 #[cfg(feature = "metrics")]
1595 if let Some(m) = self.metrics.as_mut() {
1596 m.peers_removed(&topic_hash, Churn::Prune, 1);
1597 }
1598 }
1599
1600 if self.mesh.contains_key(&topic_hash) {
1601 if !px.is_empty() {
1603 if below_threshold {
1605 tracing::debug!(
1606 peer=%peer_id,
1607 %score,
1608 topic=%topic_hash,
1609 "PRUNE: ignoring PX from peer with insufficient score"
1610 );
1611 continue;
1612 }
1613
1614 if self.config.prune_peers() > 0 {
1621 self.px_connect(px);
1622 }
1623 }
1624 }
1625 }
1626 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1627 }
1628
1629 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1630 let n = self.config.prune_peers();
1631 px.retain(|p| p.peer_id.is_some());
1636 if px.len() > n {
1637 let mut rng = thread_rng();
1639 px.partial_shuffle(&mut rng, n);
1640 px = px.into_iter().take(n).collect();
1641 }
1642
1643 for p in px {
1644 if let Some(peer_id) = p.peer_id {
1647 self.px_peers.insert(peer_id);
1649
1650 self.events.push_back(ToSwarm::Dial {
1652 opts: DialOpts::peer_id(peer_id).build(),
1653 });
1654 }
1655 }
1656 }
1657
1658 fn message_is_valid(
1661 &mut self,
1662 msg_id: &MessageId,
1663 raw_message: &mut RawMessage,
1664 propagation_source: &PeerId,
1665 ) -> bool {
1666 tracing::debug!(
1667 peer=%propagation_source,
1668 message_id=%msg_id,
1669 "Handling message from peer"
1670 );
1671
1672 if self.blacklisted_peers.contains(propagation_source) {
1674 tracing::debug!(
1675 peer=%propagation_source,
1676 "Rejecting message from blacklisted peer"
1677 );
1678 self.gossip_promises
1679 .reject_message(msg_id, &RejectReason::BlackListedPeer);
1680 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1681 peer_score.reject_message(
1682 propagation_source,
1683 msg_id,
1684 &raw_message.topic,
1685 RejectReason::BlackListedPeer,
1686 );
1687 }
1688 return false;
1689 }
1690
1691 if let Some(source) = raw_message.source.as_ref() {
1693 if self.blacklisted_peers.contains(source) {
1694 tracing::debug!(
1695 peer=%propagation_source,
1696 %source,
1697 "Rejecting message from peer because of blacklisted source"
1698 );
1699 self.handle_invalid_message(
1700 propagation_source,
1701 &raw_message.topic,
1702 Some(msg_id),
1703 RejectReason::BlackListedSource,
1704 );
1705 return false;
1706 }
1707 }
1708
1709 if !self.config.validate_messages() {
1713 raw_message.validated = true;
1714 }
1715
1716 let self_published = !self.config.allow_self_origin()
1718 && if let Some(own_id) = self.publish_config.get_own_id() {
1719 own_id != propagation_source
1720 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1721 } else {
1722 self.published_message_ids.contains(msg_id)
1723 };
1724
1725 if self_published {
1726 tracing::debug!(
1727 message_id=%msg_id,
1728 source=%propagation_source,
1729 "Dropping message claiming to be from self but forwarded from source"
1730 );
1731 self.handle_invalid_message(
1732 propagation_source,
1733 &raw_message.topic,
1734 Some(msg_id),
1735 RejectReason::SelfOrigin,
1736 );
1737 return false;
1738 }
1739
1740 true
1741 }
1742
1743 fn handle_received_message(
1747 &mut self,
1748 mut raw_message: RawMessage,
1749 propagation_source: &PeerId,
1750 ) {
1751 #[cfg(feature = "metrics")]
1753 if let Some(metrics) = self.metrics.as_mut() {
1754 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1755 }
1756
1757 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1759 Ok(message) => message,
1760 Err(e) => {
1761 tracing::debug!("Invalid message. Transform error: {:?}", e);
1762 self.handle_invalid_message(
1764 propagation_source,
1765 &raw_message.topic,
1766 None,
1767 RejectReason::ValidationError(ValidationError::TransformFailed),
1768 );
1769 return;
1770 }
1771 };
1772
1773 let msg_id = self.config.message_id(&message);
1775
1776 if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1778 let recipient_peers = self
1779 .mesh
1780 .get(&message.topic)
1781 .map(|mesh| mesh.iter())
1782 .unwrap_or_default()
1783 .copied()
1784 .chain(self.gossip_promises.peers_for_message(&msg_id))
1785 .filter(|peer_id| {
1786 peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1787 })
1788 .collect::<Vec<PeerId>>();
1789
1790 for peer_id in recipient_peers {
1791 self.send_message(
1792 peer_id,
1793 RpcOut::IDontWant(IDontWant {
1794 message_ids: vec![msg_id.clone()],
1795 }),
1796 );
1797 }
1798 }
1799
1800 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1804 return;
1805 }
1806
1807 if !self.duplicate_cache.insert(msg_id.clone()) {
1808 tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1809 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1810 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1811 }
1812 self.mcache.observe_duplicate(&msg_id, propagation_source);
1813 return;
1814 }
1815
1816 tracing::debug!(
1817 message_id=%msg_id,
1818 "Put message in duplicate_cache and resolve promises"
1819 );
1820
1821 #[cfg(feature = "metrics")]
1823 if let Some(metrics) = self.metrics.as_mut() {
1824 metrics.msg_recvd(&message.topic);
1825 }
1826
1827 self.gossip_promises.message_delivered(&msg_id);
1830
1831 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1833 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1834 }
1835
1836 self.mcache.put(&msg_id, raw_message.clone());
1838
1839 #[allow(
1841 clippy::map_entry,
1842 reason = "False positive, see rust-lang/rust-clippy#14449."
1843 )]
1844 if self.mesh.contains_key(&message.topic) {
1845 tracing::debug!("Sending received message to user");
1846 self.events
1847 .push_back(ToSwarm::GenerateEvent(Event::Message {
1848 propagation_source: *propagation_source,
1849 message_id: msg_id.clone(),
1850 message,
1851 }));
1852 } else {
1853 tracing::debug!(
1854 topic=%message.topic,
1855 "Received message on a topic we are not subscribed to"
1856 );
1857 return;
1858 }
1859
1860 if !self.config.validate_messages() {
1862 self.forward_msg(
1863 &msg_id,
1864 raw_message,
1865 Some(propagation_source),
1866 HashSet::new(),
1867 );
1868 tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1869 }
1870 }
1871
1872 fn handle_invalid_message(
1874 &mut self,
1875 propagation_source: &PeerId,
1876 topic_hash: &TopicHash,
1877 message_id: Option<&MessageId>,
1878 reject_reason: RejectReason,
1879 ) {
1880 #[cfg(feature = "metrics")]
1881 if let Some(metrics) = self.metrics.as_mut() {
1882 metrics.register_invalid_message(topic_hash);
1883 }
1884 if let Some(msg_id) = message_id {
1885 self.gossip_promises.reject_message(msg_id, &reject_reason);
1887 }
1888 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1889 if let Some(msg_id) = message_id {
1891 peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1894 } else {
1895 peer_score.reject_invalid_message(propagation_source, topic_hash);
1899 }
1900 }
1901 }
1902
1903 fn handle_received_subscriptions(
1905 &mut self,
1906 subscriptions: &[Subscription],
1907 propagation_source: &PeerId,
1908 ) {
1909 tracing::trace!(
1910 source=%propagation_source,
1911 "Handling subscriptions: {:?}",
1912 subscriptions,
1913 );
1914
1915 let mut unsubscribed_peers = Vec::new();
1916
1917 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1918 tracing::error!(
1919 peer=%propagation_source,
1920 "Subscription by unknown peer"
1921 );
1922 return;
1923 };
1924
1925 let mut topics_to_graft = Vec::new();
1927
1928 let mut application_event = Vec::new();
1930
1931 let filtered_topics = match self
1932 .subscription_filter
1933 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1934 {
1935 Ok(topics) => topics,
1936 Err(s) => {
1937 tracing::error!(
1938 peer=%propagation_source,
1939 "Subscription filter error: {}; ignoring RPC from peer",
1940 s
1941 );
1942 return;
1943 }
1944 };
1945
1946 for subscription in filtered_topics {
1947 let topic_hash = &subscription.topic_hash;
1949
1950 match subscription.action {
1951 SubscriptionAction::Subscribe => {
1952 if peer.topics.insert(topic_hash.clone()) {
1953 tracing::debug!(
1954 peer=%propagation_source,
1955 topic=%topic_hash,
1956 "SUBSCRIPTION: Adding gossip peer to topic"
1957 );
1958
1959 #[cfg(feature = "metrics")]
1960 if let Some(m) = self.metrics.as_mut() {
1961 m.inc_topic_peers(topic_hash);
1962 }
1963 }
1964
1965 if !self.explicit_peers.contains(propagation_source)
1967 && peer.kind.is_gossipsub()
1968 && !self
1969 .peer_score
1970 .below_threshold(propagation_source, |_| 0.0)
1971 .0
1972 && !self
1973 .backoffs
1974 .is_backoff_with_slack(topic_hash, propagation_source)
1975 {
1976 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1977 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1978
1979 if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1980 tracing::debug!(
1981 peer=%propagation_source,
1982 topic=%topic_hash,
1983 "SUBSCRIPTION: Adding peer to the mesh for topic"
1984 );
1985 #[cfg(feature = "metrics")]
1986 if let Some(m) = self.metrics.as_mut() {
1987 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1988 }
1989 tracing::debug!(
1991 peer=%propagation_source,
1992 topic=%topic_hash,
1993 "Sending GRAFT to peer for topic"
1994 );
1995 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1996 peer_score.graft(propagation_source, topic_hash.clone());
1997 }
1998 topics_to_graft.push(topic_hash.clone());
1999 }
2000 }
2001 }
2002 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
2004 peer_id: *propagation_source,
2005 topic: topic_hash.clone(),
2006 }));
2007 }
2008 SubscriptionAction::Unsubscribe => {
2009 if peer.topics.remove(topic_hash) {
2010 tracing::debug!(
2011 peer=%propagation_source,
2012 topic=%topic_hash,
2013 "SUBSCRIPTION: Removing gossip peer from topic"
2014 );
2015
2016 #[cfg(feature = "metrics")]
2017 if let Some(m) = self.metrics.as_mut() {
2018 m.dec_topic_peers(topic_hash);
2019 }
2020 }
2021
2022 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2023 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2025 peer_id: *propagation_source,
2026 topic: topic_hash.clone(),
2027 }));
2028 }
2029 }
2030 }
2031
2032 for (peer_id, topic_hash) in unsubscribed_peers {
2034 self.fanout
2035 .get_mut(&topic_hash)
2036 .map(|peers| peers.remove(&peer_id));
2037 if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2038 #[cfg(feature = "metrics")]
2039 if let Some(m) = self.metrics.as_mut() {
2040 m.peers_removed(&topic_hash, Churn::Unsub, 1);
2041 }
2042 };
2043 }
2044
2045 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2047 if !topics_joined.is_empty() {
2048 peer_added_to_mesh(
2049 *propagation_source,
2050 topics_joined,
2051 &self.mesh,
2052 &mut self.events,
2053 &self.connected_peers,
2054 );
2055 }
2056
2057 for topic_hash in topics_to_graft.into_iter() {
2060 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2061 }
2062
2063 for event in application_event {
2065 self.events.push_back(event);
2066 }
2067
2068 tracing::trace!(
2069 source=%propagation_source,
2070 "Completed handling subscriptions from source"
2071 );
2072 }
2073
2074 fn apply_iwant_penalties(&mut self) {
2076 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2077 for (peer, count) in self.gossip_promises.get_broken_promises() {
2078 peer_score.add_penalty(&peer, count);
2079 #[cfg(feature = "metrics")]
2080 if let Some(metrics) = self.metrics.as_mut() {
2081 metrics.register_score_penalty(Penalty::BrokenPromise);
2082 }
2083 }
2084 }
2085 }
2086
2087 fn heartbeat(&mut self) {
2089 #[cfg(feature = "metrics")]
2090 let start = Instant::now();
2091
2092 #[cfg(feature = "metrics")]
2096 if let Some(m) = &mut self.metrics {
2097 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2098 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2099 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2100 }
2101 }
2102
2103 self.heartbeat_ticks += 1;
2104
2105 let mut to_graft = HashMap::new();
2106 let mut to_prune = HashMap::new();
2107 let mut no_px = HashSet::new();
2108
2109 self.backoffs.heartbeat();
2111
2112 self.count_sent_iwant.clear();
2114 self.count_received_ihave.clear();
2115
2116 self.apply_iwant_penalties();
2118
2119 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2121 for p in self.explicit_peers.clone() {
2122 self.check_explicit_peer_connection(&p);
2123 }
2124 }
2125
2126 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2128 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2129 for peer_id in self.connected_peers.keys() {
2130 #[allow(unused_variables)]
2131 let report = scores
2132 .entry(peer_id)
2133 .or_insert_with(|| peer_score.score_report(peer_id));
2134
2135 #[cfg(feature = "metrics")]
2136 if let Some(metrics) = self.metrics.as_mut() {
2137 for penalty in &report.penalties {
2138 metrics.register_score_penalty(*penalty);
2139 }
2140 }
2141 }
2142 }
2143
2144 for (topic_hash, peers) in self.mesh.iter_mut() {
2146 let explicit_peers = &self.explicit_peers;
2147 let backoffs = &self.backoffs;
2148
2149 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2150 let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2151 let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2152 let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2153
2154 let mut to_remove_peers = Vec::new();
2158 for peer_id in peers.iter() {
2159 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2160
2161 #[cfg(feature = "metrics")]
2163 if let Some(metrics) = self.metrics.as_mut() {
2164 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2165 }
2166
2167 if peer_score < 0.0 {
2168 tracing::debug!(
2169 peer=%peer_id,
2170 score=%peer_score,
2171 topic=%topic_hash,
2172 "HEARTBEAT: Prune peer with negative score"
2173 );
2174
2175 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2176 current_topic.push(topic_hash.clone());
2177 no_px.insert(*peer_id);
2178 to_remove_peers.push(*peer_id);
2179 }
2180 }
2181
2182 #[cfg(feature = "metrics")]
2183 if let Some(m) = self.metrics.as_mut() {
2184 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2185 }
2186
2187 for peer_id in to_remove_peers {
2188 peers.remove(&peer_id);
2189 }
2190
2191 if peers.len() < mesh_n_low {
2193 tracing::debug!(
2194 topic=%topic_hash,
2195 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2196 peers.len(),
2197 self.config.mesh_n()
2198 );
2199 let desired_peers = mesh_n - peers.len();
2201 let peer_list =
2202 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2203 !peers.contains(peer)
2204 && !explicit_peers.contains(peer)
2205 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2206 && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2207 });
2208 for peer in &peer_list {
2209 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2210 current_topic.push(topic_hash.clone());
2211 }
2212 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2214 #[cfg(feature = "metrics")]
2215 if let Some(m) = self.metrics.as_mut() {
2216 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2217 }
2218 peers.extend(peer_list);
2219 }
2220
2221 if peers.len() > mesh_n_high {
2223 tracing::debug!(
2224 topic=%topic_hash,
2225 "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2226 peers.len(),
2227 self.config.mesh_n()
2228 );
2229 let excess_peer_no = peers.len() - mesh_n;
2230
2231 let mut rng = thread_rng();
2233 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2234 shuffled.shuffle(&mut rng);
2235 shuffled.sort_by(|p1, p2| {
2236 let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2237 let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2238
2239 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2240 });
2241 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2243
2244 let mut outbound = shuffled
2246 .iter()
2247 .filter(|peer_id| {
2248 self.connected_peers
2249 .get(peer_id)
2250 .is_some_and(|peer| peer.outbound)
2251 })
2252 .count();
2253
2254 let mut removed = 0;
2257 for peer in shuffled {
2258 if removed == excess_peer_no {
2259 break;
2260 }
2261 if self
2262 .connected_peers
2263 .get(&peer)
2264 .is_some_and(|peer| peer.outbound)
2265 {
2266 if outbound <= mesh_outbound_min {
2267 continue;
2269 }
2270 outbound -= 1;
2272 }
2273
2274 peers.remove(&peer);
2276 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2277 current_topic.push(topic_hash.clone());
2278 removed += 1;
2279 }
2280
2281 #[cfg(feature = "metrics")]
2282 if let Some(m) = self.metrics.as_mut() {
2283 m.peers_removed(topic_hash, Churn::Excess, removed)
2284 }
2285 }
2286
2287 if peers.len() >= mesh_n_low {
2289 let outbound = peers
2291 .iter()
2292 .filter(|peer_id| {
2293 self.connected_peers
2294 .get(peer_id)
2295 .is_some_and(|peer| peer.outbound)
2296 })
2297 .count();
2298
2299 if outbound < mesh_outbound_min {
2301 let needed = mesh_outbound_min - outbound;
2302 let peer_list =
2303 get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2304 !peers.contains(peer_id)
2305 && !explicit_peers.contains(peer_id)
2306 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2307 && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2308 && self
2309 .connected_peers
2310 .get(peer_id)
2311 .is_some_and(|peer| peer.outbound)
2312 });
2313
2314 for peer in &peer_list {
2315 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2316 current_topic.push(topic_hash.clone());
2317 }
2318 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2320 #[cfg(feature = "metrics")]
2321 if let Some(m) = self.metrics.as_mut() {
2322 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2323 }
2324 peers.extend(peer_list);
2325 }
2326 }
2327
2328 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2330 && peers.len() > 1
2331 {
2332 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2333 let mut peers_by_score: Vec<_> = peers.iter().collect();
2343 peers_by_score.sort_by(|p1, p2| {
2344 let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2345 let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2346 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2347 });
2348
2349 let middle = peers_by_score.len() / 2;
2350 let median = if peers_by_score.len() % 2 == 0 {
2351 let sub_middle_peer = *peers_by_score
2352 .get(middle - 1)
2353 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2354 let sub_middle_score = scores
2355 .get(sub_middle_peer)
2356 .map(|r| r.score)
2357 .unwrap_or_default();
2358 let middle_peer =
2359 *peers_by_score.get(middle).expect("middle < vector length");
2360 let middle_score =
2361 scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2362
2363 (sub_middle_score + middle_score) * 0.5
2364 } else {
2365 scores
2366 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2367 .map(|r| r.score)
2368 .unwrap_or_default()
2369 };
2370
2371 if median < peer_score.thresholds.opportunistic_graft_threshold {
2374 let peer_list = get_random_peers(
2375 &self.connected_peers,
2376 topic_hash,
2377 self.config.opportunistic_graft_peers(),
2378 |peer_id| {
2379 !peers.contains(peer_id)
2380 && !explicit_peers.contains(peer_id)
2381 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2382 && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2383 > median
2384 },
2385 );
2386 for peer in &peer_list {
2387 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2388 current_topic.push(topic_hash.clone());
2389 }
2390 tracing::debug!(
2392 topic=%topic_hash,
2393 "Opportunistically graft in topic with peers {:?}",
2394 peer_list
2395 );
2396 #[cfg(feature = "metrics")]
2397 if let Some(m) = self.metrics.as_mut() {
2398 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2399 }
2400 peers.extend(peer_list);
2401 }
2402 }
2403 }
2404 #[cfg(feature = "metrics")]
2406 if let Some(m) = self.metrics.as_mut() {
2407 m.set_mesh_peers(topic_hash, peers.len())
2408 }
2409 }
2410
2411 {
2413 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2415 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2416 if *last_pub_time + fanout_ttl < Instant::now() {
2417 tracing::debug!(
2418 topic=%topic_hash,
2419 "HEARTBEAT: Fanout topic removed due to timeout"
2420 );
2421 fanout.remove(topic_hash);
2422 return false;
2423 }
2424 true
2425 });
2426 }
2427
2428 for (topic_hash, peers) in self.fanout.iter_mut() {
2431 let mut to_remove_peers = Vec::new();
2432 let publish_threshold = match &self.peer_score {
2433 PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2434 _ => 0.0,
2435 };
2436 let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2437
2438 for peer_id in peers.iter() {
2439 let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2441 match self.connected_peers.get(peer_id) {
2442 Some(peer) => {
2443 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2444 tracing::debug!(
2445 topic=%topic_hash,
2446 "HEARTBEAT: Peer removed from fanout for topic"
2447 );
2448 to_remove_peers.push(*peer_id);
2449 }
2450 }
2451 None => {
2452 to_remove_peers.push(*peer_id);
2454 }
2455 }
2456 }
2457 for to_remove in to_remove_peers {
2458 peers.remove(&to_remove);
2459 }
2460
2461 if peers.len() < mesh_n {
2463 tracing::debug!(
2464 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2465 peers.len(),
2466 mesh_n
2467 );
2468 let needed_peers = mesh_n - peers.len();
2469 let explicit_peers = &self.explicit_peers;
2470 let new_peers =
2471 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2472 !peers.contains(peer_id)
2473 && !explicit_peers.contains(peer_id)
2474 && !self
2475 .peer_score
2476 .below_threshold(peer_id, |ts| ts.publish_threshold)
2477 .0
2478 });
2479 peers.extend(new_peers);
2480 }
2481 }
2482
2483 if let PeerScoreState::Active(peer_score) = &self.peer_score {
2484 tracing::trace!("Mesh message deliveries: {:?}", {
2485 self.mesh
2486 .iter()
2487 .map(|(t, peers)| {
2488 (
2489 t.clone(),
2490 peers
2491 .iter()
2492 .map(|p| {
2493 (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2494 })
2495 .collect::<HashMap<PeerId, f64>>(),
2496 )
2497 })
2498 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2499 })
2500 }
2501
2502 self.emit_gossip();
2503
2504 if !to_graft.is_empty() | !to_prune.is_empty() {
2506 self.send_graft_prune(to_graft, to_prune, no_px);
2507 }
2508
2509 self.mcache.shift();
2511
2512 for (peer_id, failed_messages) in self.failed_messages.drain() {
2514 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2515 self.events
2516 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2517 peer_id,
2518 failed_messages,
2519 }));
2520 }
2521 self.failed_messages.shrink_to_fit();
2522
2523 for peer in self.connected_peers.values_mut() {
2525 while let Some((_front, instant)) = peer.dont_send.front() {
2526 if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2527 break;
2528 } else {
2529 peer.dont_send.pop_front();
2530 }
2531 }
2532 }
2533
2534 #[cfg(feature = "metrics")]
2535 if let Some(metrics) = self.metrics.as_mut() {
2536 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2537 metrics.observe_heartbeat_duration(duration);
2538 }
2539 }
2540
2541 fn emit_gossip(&mut self) {
2544 let mut rng = thread_rng();
2545 let mut messages = Vec::new();
2546 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2547 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2548 if message_ids.is_empty() {
2549 continue;
2550 }
2551
2552 if message_ids.len() > self.config.max_ihave_length() {
2554 tracing::debug!(
2556 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2557 message_ids.len()
2558 );
2559 } else {
2560 message_ids.shuffle(&mut rng);
2562 }
2563
2564 let n_map = |m| {
2566 max(
2567 self.config.gossip_lazy(),
2568 (self.config.gossip_factor() * m as f64) as usize,
2569 )
2570 };
2571 let to_msg_peers =
2573 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2574 !peers.contains(peer)
2575 && !self.explicit_peers.contains(peer)
2576 && !self
2577 .peer_score
2578 .below_threshold(peer, |ts| ts.gossip_threshold)
2579 .0
2580 });
2581
2582 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2583
2584 for peer_id in to_msg_peers {
2585 let mut peer_message_ids = message_ids.clone();
2586
2587 if peer_message_ids.len() > self.config.max_ihave_length() {
2588 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2592 peer_message_ids.truncate(self.config.max_ihave_length());
2593 }
2594
2595 messages.push((
2597 peer_id,
2598 RpcOut::IHave(IHave {
2599 topic_hash: topic_hash.clone(),
2600 message_ids: peer_message_ids,
2601 }),
2602 ));
2603 }
2604 }
2605 for (peer_id, message) in messages {
2606 self.send_message(peer_id, message);
2607 }
2608 }
2609
2610 fn send_graft_prune(
2613 &mut self,
2614 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2615 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2616 no_px: HashSet<PeerId>,
2617 ) {
2618 for (peer_id, topics) in to_graft.into_iter() {
2620 for topic in &topics {
2621 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2623 peer_score.graft(&peer_id, topic.clone());
2624 }
2625
2626 peer_added_to_mesh(
2629 peer_id,
2630 vec![topic],
2631 &self.mesh,
2632 &mut self.events,
2633 &self.connected_peers,
2634 );
2635 }
2636 let rpc_msgs = topics.iter().map(|topic_hash| {
2637 RpcOut::Graft(Graft {
2638 topic_hash: topic_hash.clone(),
2639 })
2640 });
2641
2642 let prune_msgs = to_prune
2649 .remove(&peer_id)
2650 .into_iter()
2651 .flatten()
2652 .map(|topic_hash| {
2653 let prune = self.make_prune(
2654 &topic_hash,
2655 &peer_id,
2656 self.config.do_px() && !no_px.contains(&peer_id),
2657 false,
2658 );
2659 RpcOut::Prune(prune)
2660 });
2661
2662 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2664 self.send_message(peer_id, msg);
2665 }
2666 }
2667
2668 for (peer_id, topics) in to_prune.iter() {
2671 for topic_hash in topics {
2672 let prune = self.make_prune(
2673 topic_hash,
2674 peer_id,
2675 self.config.do_px() && !no_px.contains(peer_id),
2676 false,
2677 );
2678 self.send_message(*peer_id, RpcOut::Prune(prune));
2679
2680 peer_removed_from_mesh(
2682 *peer_id,
2683 topic_hash,
2684 &self.mesh,
2685 &mut self.events,
2686 &self.connected_peers,
2687 );
2688 }
2689 }
2690 }
2691
2692 fn forward_msg(
2696 &mut self,
2697 msg_id: &MessageId,
2698 message: RawMessage,
2699 propagation_source: Option<&PeerId>,
2700 originating_peers: HashSet<PeerId>,
2701 ) -> bool {
2702 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2704 if let Some(peer) = propagation_source {
2705 peer_score.deliver_message(peer, msg_id, &message.topic);
2706 }
2707 }
2708
2709 tracing::debug!(message_id=%msg_id, "Forwarding message");
2710 let mut recipient_peers = HashSet::new();
2711
2712 for (peer_id, peer) in &self.connected_peers {
2716 if Some(peer_id) != propagation_source
2717 && !originating_peers.contains(peer_id)
2718 && Some(peer_id) != message.source.as_ref()
2719 && peer.topics.contains(&message.topic)
2720 && (self.explicit_peers.contains(peer_id)
2721 || (peer.kind == PeerKind::Floodsub
2722 && !self
2723 .peer_score
2724 .below_threshold(peer_id, |ts| ts.publish_threshold)
2725 .0))
2726 {
2727 recipient_peers.insert(*peer_id);
2728 }
2729 }
2730
2731 let topic = &message.topic;
2733 if let Some(mesh_peers) = self.mesh.get(topic) {
2735 for peer_id in mesh_peers {
2736 if Some(peer_id) != propagation_source
2737 && !originating_peers.contains(peer_id)
2738 && Some(peer_id) != message.source.as_ref()
2739 {
2740 recipient_peers.insert(*peer_id);
2741 }
2742 }
2743 }
2744
2745 if recipient_peers.is_empty() {
2746 return false;
2747 }
2748
2749 for peer_id in recipient_peers.iter() {
2751 if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2752 if peer.dont_send.contains_key(msg_id) {
2753 tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2754 continue;
2755 }
2756
2757 tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2758
2759 self.send_message(
2760 *peer_id,
2761 RpcOut::Forward {
2762 message: message.clone(),
2763 timeout: Delay::new(self.config.forward_queue_duration()),
2764 },
2765 );
2766 }
2767 }
2768 tracing::debug!("Completed forwarding message");
2769 true
2770 }
2771
2772 pub(crate) fn build_raw_message(
2774 &mut self,
2775 topic: TopicHash,
2776 data: Vec<u8>,
2777 ) -> Result<RawMessage, PublishError> {
2778 match &mut self.publish_config {
2779 PublishConfig::Signing {
2780 ref keypair,
2781 author,
2782 inline_key,
2783 last_seq_no,
2784 } => {
2785 let sequence_number = last_seq_no.next();
2786
2787 let signature = {
2788 let message = proto::Message {
2789 from: Some(author.to_bytes()),
2790 data: Some(data.clone()),
2791 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2792 topic: topic.clone().into_string(),
2793 signature: None,
2794 key: None,
2795 };
2796
2797 let mut buf = Vec::with_capacity(message.get_size());
2798 let mut writer = Writer::new(&mut buf);
2799
2800 message
2801 .write_message(&mut writer)
2802 .expect("Encoding to succeed");
2803
2804 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2806 signature_bytes.extend_from_slice(&buf);
2807 Some(keypair.sign(&signature_bytes)?)
2808 };
2809
2810 Ok(RawMessage {
2811 source: Some(*author),
2812 data,
2813 sequence_number: Some(sequence_number),
2816 topic,
2817 signature,
2818 key: inline_key.clone(),
2819 validated: true, })
2821 }
2822 PublishConfig::Author(peer_id) => {
2823 Ok(RawMessage {
2824 source: Some(*peer_id),
2825 data,
2826 sequence_number: Some(rand::random()),
2829 topic,
2830 signature: None,
2831 key: None,
2832 validated: true, })
2834 }
2835 PublishConfig::RandomAuthor => {
2836 Ok(RawMessage {
2837 source: Some(PeerId::random()),
2838 data,
2839 sequence_number: Some(rand::random()),
2842 topic,
2843 signature: None,
2844 key: None,
2845 validated: true, })
2847 }
2848 PublishConfig::Anonymous => {
2849 Ok(RawMessage {
2850 source: None,
2851 data,
2852 sequence_number: None,
2855 topic,
2856 signature: None,
2857 key: None,
2858 validated: true, })
2860 }
2861 }
2862 }
2863
2864 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2871 #[cfg(feature = "metrics")]
2872 if let Some(m) = self.metrics.as_mut() {
2873 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2874 m.msg_sent(&message.topic, message.raw_protobuf_len());
2876 }
2877 }
2878
2879 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2880 tracing::error!(peer = %peer_id,
2881 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2882 return false;
2883 };
2884
2885 if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2886 tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2887 return false;
2888 }
2889
2890 match peer.sender.send_message(rpc) {
2892 Ok(()) => true,
2893 Err(rpc) => {
2894 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2896
2897 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2899 match rpc {
2900 RpcOut::Publish { .. } => {
2901 failed_messages.priority += 1;
2902 failed_messages.publish += 1;
2903 }
2904 RpcOut::Forward { .. } => {
2905 failed_messages.non_priority += 1;
2906 failed_messages.forward += 1;
2907 }
2908 RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2909 failed_messages.non_priority += 1;
2910 }
2911 RpcOut::Graft(_)
2912 | RpcOut::Prune(_)
2913 | RpcOut::Subscribe(_)
2914 | RpcOut::Unsubscribe(_) => {
2915 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2916 }
2917 }
2918
2919 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2921 peer_score.failed_message_slow_peer(&peer_id);
2922 }
2923
2924 false
2925 }
2926 }
2927 }
2928
2929 fn on_connection_established(
2930 &mut self,
2931 ConnectionEstablished {
2932 peer_id,
2933 endpoint,
2934 other_established,
2935 ..
2936 }: ConnectionEstablished,
2937 ) {
2938 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2940 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2941 peer_score.add_ip(&peer_id, ip);
2942 } else {
2943 tracing::trace!(
2944 peer=%peer_id,
2945 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2946 endpoint
2947 )
2948 }
2949 }
2950
2951 if other_established > 0 {
2952 return; }
2954
2955 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2956 peer_score.add_peer(peer_id);
2957 }
2958
2959 if self.blacklisted_peers.contains(&peer_id) {
2961 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2962 return;
2963 }
2964
2965 tracing::debug!(peer=%peer_id, "New peer connected");
2966 for topic_hash in self.mesh.clone().into_keys() {
2968 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2969 }
2970 }
2971
2972 fn on_connection_closed(
2973 &mut self,
2974 ConnectionClosed {
2975 peer_id,
2976 connection_id,
2977 endpoint,
2978 remaining_established,
2979 ..
2980 }: ConnectionClosed,
2981 ) {
2982 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2984 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2985 peer_score.remove_ip(&peer_id, &ip);
2986 } else {
2987 tracing::trace!(
2988 peer=%peer_id,
2989 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2990 endpoint
2991 )
2992 }
2993 }
2994
2995 if remaining_established != 0 {
2996 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2998 let index = peer
2999 .connections
3000 .iter()
3001 .position(|v| v == &connection_id)
3002 .expect("Previously established connection to peer must be present");
3003 peer.connections.remove(index);
3004
3005 if !peer.connections.is_empty() {
3008 for topic in &peer.topics {
3009 if let Some(mesh_peers) = self.mesh.get(topic) {
3010 if mesh_peers.contains(&peer_id) {
3011 self.events.push_back(ToSwarm::NotifyHandler {
3012 peer_id,
3013 event: HandlerIn::JoinedMesh,
3014 handler: NotifyHandler::One(peer.connections[0]),
3015 });
3016 break;
3017 }
3018 }
3019 }
3020 }
3021 }
3022 } else {
3023 tracing::debug!(peer=%peer_id, "Peer disconnected");
3025 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3026 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3027 return;
3028 };
3029
3030 for topic in &connected_peer.topics {
3032 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3034 if mesh_peers.remove(&peer_id) {
3036 #[cfg(feature = "metrics")]
3037 if let Some(m) = self.metrics.as_mut() {
3038 m.peers_removed(topic, Churn::Dc, 1);
3039 m.set_mesh_peers(topic, mesh_peers.len());
3040 }
3041 };
3042 }
3043
3044 #[cfg(feature = "metrics")]
3045 if let Some(m) = self.metrics.as_mut() {
3046 m.dec_topic_peers(topic);
3047 }
3048
3049 self.fanout
3051 .get_mut(topic)
3052 .map(|peers| peers.remove(&peer_id));
3053 }
3054
3055 self.px_peers.remove(&peer_id);
3057
3058 #[cfg(feature = "metrics")]
3060 if let Some(metrics) = self.metrics.as_mut() {
3061 metrics.peer_protocol_disconnected(connected_peer.kind);
3062 }
3063
3064 self.connected_peers.remove(&peer_id);
3065
3066 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3067 peer_score.remove_peer(&peer_id);
3068 }
3069 }
3070 }
3071
3072 fn on_address_change(
3073 &mut self,
3074 AddressChange {
3075 peer_id,
3076 old: endpoint_old,
3077 new: endpoint_new,
3078 ..
3079 }: AddressChange,
3080 ) {
3081 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3083 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3084 peer_score.remove_ip(&peer_id, &ip);
3085 } else {
3086 tracing::trace!(
3087 peer=%&peer_id,
3088 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3089 endpoint_old
3090 )
3091 }
3092 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3093 peer_score.add_ip(&peer_id, ip);
3094 } else {
3095 tracing::trace!(
3096 peer=%peer_id,
3097 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3098 endpoint_new
3099 )
3100 }
3101 }
3102 }
3103
3104 #[cfg(feature = "metrics")]
3105 pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3107 if let Some(metrics) = &mut self.metrics {
3108 metrics.register_allowed_topics(topics);
3109 }
3110 }
3111}
3112
3113fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3114 addr.iter().find_map(|p| match p {
3115 Ip4(addr) => Some(IpAddr::V4(addr)),
3116 Ip6(addr) => Some(IpAddr::V6(addr)),
3117 _ => None,
3118 })
3119}
3120
3121impl<C, F> NetworkBehaviour for Behaviour<C, F>
3122where
3123 C: Send + 'static + DataTransform,
3124 F: Send + 'static + TopicSubscriptionFilter,
3125{
3126 type ConnectionHandler = Handler;
3127 type ToSwarm = Event;
3128
3129 fn handle_established_inbound_connection(
3130 &mut self,
3131 connection_id: ConnectionId,
3132 peer_id: PeerId,
3133 _: &Multiaddr,
3134 _: &Multiaddr,
3135 ) -> Result<THandler<Self>, ConnectionDenied> {
3136 let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3142 kind: PeerKind::Floodsub,
3143 connections: vec![],
3144 outbound: false,
3145 sender: Sender::new(self.config.connection_handler_queue_len()),
3146 topics: Default::default(),
3147 dont_send: LinkedHashMap::new(),
3148 });
3149 connected_peer.connections.push(connection_id);
3151
3152 Ok(Handler::new(
3153 self.config.protocol_config(),
3154 connected_peer.sender.new_receiver(),
3155 ))
3156 }
3157
3158 fn handle_established_outbound_connection(
3159 &mut self,
3160 connection_id: ConnectionId,
3161 peer_id: PeerId,
3162 _: &Multiaddr,
3163 _: Endpoint,
3164 _: PortUse,
3165 ) -> Result<THandler<Self>, ConnectionDenied> {
3166 let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3167 kind: PeerKind::Floodsub,
3168 connections: vec![],
3169 outbound: !self.px_peers.contains(&peer_id),
3172 sender: Sender::new(self.config.connection_handler_queue_len()),
3173 topics: Default::default(),
3174 dont_send: LinkedHashMap::new(),
3175 });
3176 connected_peer.connections.push(connection_id);
3178
3179 Ok(Handler::new(
3180 self.config.protocol_config(),
3181 connected_peer.sender.new_receiver(),
3182 ))
3183 }
3184
3185 fn on_connection_handler_event(
3186 &mut self,
3187 propagation_source: PeerId,
3188 _connection_id: ConnectionId,
3189 handler_event: THandlerOutEvent<Self>,
3190 ) {
3191 match handler_event {
3192 HandlerEvent::PeerKind(kind) => {
3193 #[cfg(feature = "metrics")]
3196 if let Some(metrics) = self.metrics.as_mut() {
3197 metrics.peer_protocol_connected(kind);
3198 }
3199
3200 if let PeerKind::NotSupported = kind {
3201 tracing::debug!(
3202 peer=%propagation_source,
3203 "Peer does not support gossipsub protocols"
3204 );
3205 self.events
3206 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3207 peer_id: propagation_source,
3208 }));
3209 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3210 tracing::debug!(
3214 peer=%propagation_source,
3215 peer_type=%kind,
3216 "New peer type found for peer"
3217 );
3218 if let PeerKind::Floodsub = conn.kind {
3219 conn.kind = kind;
3220 }
3221 }
3222 }
3223 HandlerEvent::MessageDropped(rpc) => {
3224 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3226 peer_score.failed_message_slow_peer(&propagation_source);
3227 }
3228
3229 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3231 failed_messages.timeout += 1;
3232 match rpc {
3233 RpcOut::Publish { .. } => {
3234 failed_messages.publish += 1;
3235 }
3236 RpcOut::Forward { .. } => {
3237 failed_messages.forward += 1;
3238 }
3239 _ => {}
3240 }
3241
3242 #[cfg(feature = "metrics")]
3244 if let Some(metrics) = self.metrics.as_mut() {
3245 match rpc {
3246 RpcOut::Publish { message, .. } => {
3247 metrics.publish_msg_dropped(&message.topic);
3248 metrics.timeout_msg_dropped(&message.topic);
3249 }
3250 RpcOut::Forward { message, .. } => {
3251 metrics.forward_msg_dropped(&message.topic);
3252 metrics.timeout_msg_dropped(&message.topic);
3253 }
3254 _ => {}
3255 }
3256 }
3257 }
3258 HandlerEvent::Message {
3259 rpc,
3260 invalid_messages,
3261 } => {
3262 if !rpc.subscriptions.is_empty() {
3267 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3268 }
3269
3270 if let (true, _) = self
3272 .peer_score
3273 .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3274 {
3275 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3276 return;
3277 }
3278
3279 if let PeerScoreState::Active(_) = self.peer_score {
3281 for (raw_message, validation_error) in invalid_messages {
3282 self.handle_invalid_message(
3283 &propagation_source,
3284 &raw_message.topic,
3285 None,
3286 RejectReason::ValidationError(validation_error),
3287 )
3288 }
3289 } else {
3290 for (message, validation_error) in invalid_messages {
3292 tracing::warn!(
3293 peer=%propagation_source,
3294 source=?message.source,
3295 "Invalid message from peer. Reason: {:?}",
3296 validation_error,
3297 );
3298 }
3299 }
3300
3301 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3303 if self
3305 .config
3306 .max_messages_per_rpc()
3307 .is_some_and(|max_msg| count >= max_msg)
3308 {
3309 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3310 break;
3311 }
3312 self.handle_received_message(raw_message, &propagation_source);
3313 }
3314
3315 let mut ihave_msgs = vec![];
3319 let mut graft_msgs = vec![];
3320 let mut prune_msgs = vec![];
3321 for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3322 if self
3324 .config
3325 .max_messages_per_rpc()
3326 .is_some_and(|max_msg| count >= max_msg)
3327 {
3328 tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3329 break;
3330 }
3331
3332 match control_msg {
3333 ControlAction::IHave(IHave {
3334 topic_hash,
3335 message_ids,
3336 }) => {
3337 ihave_msgs.push((topic_hash, message_ids));
3338 }
3339 ControlAction::IWant(IWant { message_ids }) => {
3340 self.handle_iwant(&propagation_source, message_ids)
3341 }
3342 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3343 ControlAction::Prune(Prune {
3344 topic_hash,
3345 peers,
3346 backoff,
3347 }) => prune_msgs.push((topic_hash, peers, backoff)),
3348 ControlAction::IDontWant(IDontWant { message_ids }) => {
3349 let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3350 else {
3351 tracing::error!(peer = %propagation_source,
3352 "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3353 continue;
3354 };
3355 #[cfg(feature = "metrics")]
3356 if let Some(metrics) = self.metrics.as_mut() {
3357 metrics.register_idontwant(message_ids.len());
3358 }
3359 for message_id in message_ids {
3360 peer.dont_send.insert(message_id, Instant::now());
3361 if peer.dont_send.len() > IDONTWANT_CAP {
3363 peer.dont_send.pop_front();
3364 }
3365 }
3366 }
3367 }
3368 }
3369 if !ihave_msgs.is_empty() {
3370 self.handle_ihave(&propagation_source, ihave_msgs);
3371 }
3372 if !graft_msgs.is_empty() {
3373 self.handle_graft(&propagation_source, graft_msgs);
3374 }
3375 if !prune_msgs.is_empty() {
3376 self.handle_prune(&propagation_source, prune_msgs);
3377 }
3378 }
3379 }
3380 }
3381
3382 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3383 fn poll(
3384 &mut self,
3385 cx: &mut Context<'_>,
3386 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3387 if let Some(event) = self.events.pop_front() {
3388 return Poll::Ready(event);
3389 }
3390
3391 if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3393 if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3394 peer_score.refresh_scores();
3395 peer_score
3396 .decay_interval
3397 .reset(peer_score.params.decay_interval);
3398 }
3399 }
3400
3401 if self.heartbeat.poll_unpin(cx).is_ready() {
3402 self.heartbeat();
3403 self.heartbeat.reset(self.config.heartbeat_interval());
3404 }
3405
3406 Poll::Pending
3407 }
3408
3409 fn on_swarm_event(&mut self, event: FromSwarm) {
3410 match event {
3411 FromSwarm::ConnectionEstablished(connection_established) => {
3412 self.on_connection_established(connection_established)
3413 }
3414 FromSwarm::ConnectionClosed(connection_closed) => {
3415 self.on_connection_closed(connection_closed)
3416 }
3417 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3418 _ => {}
3419 }
3420 }
3421}
3422
3423fn peer_added_to_mesh(
3427 peer_id: PeerId,
3428 new_topics: Vec<&TopicHash>,
3429 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3430 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3431 connections: &HashMap<PeerId, PeerDetails>,
3432) {
3433 let connection_id = match connections.get(&peer_id) {
3435 Some(p) => p
3436 .connections
3437 .first()
3438 .expect("There should be at least one connection to a peer."),
3439 None => {
3440 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3441 return;
3442 }
3443 };
3444
3445 if let Some(peer) = connections.get(&peer_id) {
3446 for topic in &peer.topics {
3447 if !new_topics.contains(&topic) {
3448 if let Some(mesh_peers) = mesh.get(topic) {
3449 if mesh_peers.contains(&peer_id) {
3450 return;
3452 }
3453 }
3454 }
3455 }
3456 }
3457 events.push_back(ToSwarm::NotifyHandler {
3459 peer_id,
3460 event: HandlerIn::JoinedMesh,
3461 handler: NotifyHandler::One(*connection_id),
3462 });
3463}
3464
3465fn peer_removed_from_mesh(
3469 peer_id: PeerId,
3470 old_topic: &TopicHash,
3471 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3472 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3473 connections: &HashMap<PeerId, PeerDetails>,
3474) {
3475 let connection_id = match connections.get(&peer_id) {
3477 Some(p) => p
3478 .connections
3479 .first()
3480 .expect("There should be at least one connection to a peer."),
3481 None => {
3482 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3483 return;
3484 }
3485 };
3486
3487 if let Some(peer) = connections.get(&peer_id) {
3488 for topic in &peer.topics {
3489 if topic != old_topic {
3490 if let Some(mesh_peers) = mesh.get(topic) {
3491 if mesh_peers.contains(&peer_id) {
3492 return;
3494 }
3495 }
3496 }
3497 }
3498 }
3499 events.push_back(ToSwarm::NotifyHandler {
3501 peer_id,
3502 event: HandlerIn::LeftMesh,
3503 handler: NotifyHandler::One(*connection_id),
3504 });
3505}
3506
3507fn get_random_peers_dynamic(
3511 connected_peers: &HashMap<PeerId, PeerDetails>,
3512 topic_hash: &TopicHash,
3513 n_map: impl Fn(usize) -> usize,
3515 mut f: impl FnMut(&PeerId) -> bool,
3516) -> BTreeSet<PeerId> {
3517 let mut gossip_peers = connected_peers
3518 .iter()
3519 .filter(|(_, p)| p.topics.contains(topic_hash))
3520 .filter(|(peer_id, _)| f(peer_id))
3521 .filter(|(_, p)| p.kind.is_gossipsub())
3522 .map(|(peer_id, _)| *peer_id)
3523 .collect::<Vec<PeerId>>();
3524
3525 let n = n_map(gossip_peers.len());
3527 if gossip_peers.len() <= n {
3528 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3529 return gossip_peers.into_iter().collect();
3530 }
3531
3532 let mut rng = thread_rng();
3534 gossip_peers.partial_shuffle(&mut rng, n);
3535
3536 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3537
3538 gossip_peers.into_iter().take(n).collect()
3539}
3540
3541fn get_random_peers(
3544 connected_peers: &HashMap<PeerId, PeerDetails>,
3545 topic_hash: &TopicHash,
3546 n: usize,
3547 f: impl FnMut(&PeerId) -> bool,
3548) -> BTreeSet<PeerId> {
3549 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3550}
3551
3552fn validate_config(
3555 authenticity: &MessageAuthenticity,
3556 validation_mode: &ValidationMode,
3557) -> Result<(), &'static str> {
3558 match validation_mode {
3559 ValidationMode::Anonymous => {
3560 if authenticity.is_signing() {
3561 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3562 }
3563
3564 if !authenticity.is_anonymous() {
3565 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3566 }
3567 }
3568 ValidationMode::Strict => {
3569 if !authenticity.is_signing() {
3570 return Err(
3571 "Messages will be
3572 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3573 the validation or privacy settings in the config"
3574 );
3575 }
3576 }
3577 _ => {}
3578 }
3579 Ok(())
3580}
3581
3582impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3584 f.debug_struct("Behaviour")
3585 .field("config", &self.config)
3586 .field("events", &self.events.len())
3587 .field("publish_config", &self.publish_config)
3588 .field("mesh", &self.mesh)
3589 .field("fanout", &self.fanout)
3590 .field("fanout_last_pub", &self.fanout_last_pub)
3591 .field("mcache", &self.mcache)
3592 .field("heartbeat", &self.heartbeat)
3593 .finish()
3594 }
3595}
3596
3597impl fmt::Debug for PublishConfig {
3598 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3599 match self {
3600 PublishConfig::Signing { author, .. } => {
3601 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3602 }
3603 PublishConfig::Author(author) => {
3604 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3605 }
3606 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3607 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3608 }
3609 }
3610}