1use std::{
22 cmp::{max, Ordering},
23 collections::HashSet,
24 collections::VecDeque,
25 collections::{BTreeSet, HashMap},
26 fmt,
27 iter::FromIterator,
28 net::IpAddr,
29 sync::Arc,
30 task::{Context, Poll},
31 time::Duration,
32};
33
34use futures::StreamExt;
35use log::{debug, error, info, trace, warn};
36use prost::Message;
37use rand::{seq::SliceRandom, thread_rng};
38use wasm_timer::{Instant, Interval};
39
40use libp2p_core::{
41 connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
42 multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
43};
44use libp2p_swarm::{
45 DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
46 ProtocolsHandler,
47};
48
49use crate::backoff::BackoffStorage;
50use crate::config::{GossipsubConfig, ValidationMode};
51use crate::error::{PublishError, SubscriptionError, ValidationError};
52use crate::gossip_promises::GossipPromises;
53use crate::handler::{GossipsubHandler, HandlerEvent};
54use crate::mcache::MessageCache;
55use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
56use crate::protocol::SIGNING_PREFIX;
57use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
58use crate::time_cache::{DuplicateCache, TimeCache};
59use crate::topic::{Hasher, Topic, TopicHash};
60use crate::transform::{DataTransform, IdentityTransform};
61use crate::types::{
62 FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
63 GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
64};
65use crate::types::{GossipsubRpc, PeerKind};
66use crate::{rpc_proto, TopicScoreParams};
67use std::{cmp::Ordering::Equal, fmt::Debug};
68
69#[cfg(test)]
70mod tests;
71
72#[derive(Clone)]
79pub enum MessageAuthenticity {
80 Signed(Keypair),
83 Author(PeerId),
88 RandomAuthor,
93 Anonymous,
103}
104
105impl MessageAuthenticity {
106 pub fn is_signing(&self) -> bool {
108 matches!(self, MessageAuthenticity::Signed(_))
109 }
110
111 pub fn is_anonymous(&self) -> bool {
112 matches!(self, MessageAuthenticity::Anonymous)
113 }
114}
115
116#[derive(Debug)]
118pub enum GossipsubEvent {
119 Message {
121 propagation_source: PeerId,
123 message_id: MessageId,
126 message: GossipsubMessage,
128 },
129 Subscribed {
131 peer_id: PeerId,
133 topic: TopicHash,
135 },
136 Unsubscribed {
138 peer_id: PeerId,
140 topic: TopicHash,
142 },
143}
144
145enum PublishConfig {
148 Signing {
149 keypair: Keypair,
150 author: PeerId,
151 inline_key: Option<Vec<u8>>,
152 },
153 Author(PeerId),
154 RandomAuthor,
155 Anonymous,
156}
157
158impl PublishConfig {
159 pub fn get_own_id(&self) -> Option<&PeerId> {
160 match self {
161 Self::Signing { author, .. } => Some(&author),
162 Self::Author(author) => Some(&author),
163 _ => None,
164 }
165 }
166}
167
168impl From<MessageAuthenticity> for PublishConfig {
169 fn from(authenticity: MessageAuthenticity) -> Self {
170 match authenticity {
171 MessageAuthenticity::Signed(keypair) => {
172 let public_key = keypair.public();
173 let key_enc = public_key.clone().into_protobuf_encoding();
174 let key = if key_enc.len() <= 42 {
175 None
178 } else {
179 Some(key_enc)
181 };
182
183 PublishConfig::Signing {
184 keypair,
185 author: public_key.into_peer_id(),
186 inline_key: key,
187 }
188 }
189 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
190 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
191 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
192 }
193 }
194}
195
196type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
197
198pub struct Gossipsub<
210 D: DataTransform = IdentityTransform,
211 F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
212> {
213 config: GossipsubConfig,
215
216 events: VecDeque<GossipsubNetworkBehaviourAction>,
218
219 control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
221
222 publish_config: PublishConfig,
224
225 duplicate_cache: DuplicateCache<MessageId>,
228
229 peer_protocols: HashMap<PeerId, PeerKind>,
232
233 topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
235
236 peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
238
239 explicit_peers: HashSet<PeerId>,
242
243 blacklisted_peers: HashSet<PeerId>,
246
247 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
249
250 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
252
253 fanout_last_pub: HashMap<TopicHash, Instant>,
255
256 backoffs: BackoffStorage,
258
259 mcache: MessageCache,
261
262 heartbeat: Interval,
264
265 heartbeat_ticks: u64,
268
269 px_peers: HashSet<PeerId>,
274
275 outbound_peers: HashSet<PeerId>,
278
279 peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
282
283 count_received_ihave: HashMap<PeerId, usize>,
285
286 count_sent_iwant: HashMap<PeerId, usize>,
288
289 published_message_ids: DuplicateCache<MessageId>,
292
293 fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
295
296 subscription_filter: F,
298
299 data_transform: D,
303}
304
305impl<D, F> Gossipsub<D, F>
306where
307 D: DataTransform + Default,
308 F: TopicSubscriptionFilter + Default,
309{
310 pub fn new(
313 privacy: MessageAuthenticity,
314 config: GossipsubConfig,
315 ) -> Result<Self, &'static str> {
316 Self::new_with_subscription_filter_and_transform(
317 privacy,
318 config,
319 F::default(),
320 D::default(),
321 )
322 }
323}
324
325impl<D, F> Gossipsub<D, F>
326where
327 D: DataTransform + Default,
328 F: TopicSubscriptionFilter,
329{
330 pub fn new_with_subscription_filter(
333 privacy: MessageAuthenticity,
334 config: GossipsubConfig,
335 subscription_filter: F,
336 ) -> Result<Self, &'static str> {
337 Self::new_with_subscription_filter_and_transform(
338 privacy,
339 config,
340 subscription_filter,
341 D::default(),
342 )
343 }
344}
345
346impl<D, F> Gossipsub<D, F>
347where
348 D: DataTransform,
349 F: TopicSubscriptionFilter + Default,
350{
351 pub fn new_with_transform(
354 privacy: MessageAuthenticity,
355 config: GossipsubConfig,
356 data_transform: D,
357 ) -> Result<Self, &'static str> {
358 Self::new_with_subscription_filter_and_transform(
359 privacy,
360 config,
361 F::default(),
362 data_transform,
363 )
364 }
365}
366
367impl<D, F> Gossipsub<D, F>
368where
369 D: DataTransform,
370 F: TopicSubscriptionFilter,
371{
372 pub fn new_with_subscription_filter_and_transform(
375 privacy: MessageAuthenticity,
376 config: GossipsubConfig,
377 subscription_filter: F,
378 data_transform: D,
379 ) -> Result<Self, &'static str> {
380 validate_config(&privacy, &config.validation_mode())?;
385
386 Ok(Gossipsub {
389 events: VecDeque::new(),
390 control_pool: HashMap::new(),
391 publish_config: privacy.into(),
392 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
393 fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
394 topic_peers: HashMap::new(),
395 peer_topics: HashMap::new(),
396 explicit_peers: HashSet::new(),
397 blacklisted_peers: HashSet::new(),
398 mesh: HashMap::new(),
399 fanout: HashMap::new(),
400 fanout_last_pub: HashMap::new(),
401 backoffs: BackoffStorage::new(
402 &config.prune_backoff(),
403 config.heartbeat_interval(),
404 config.backoff_slack(),
405 ),
406 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
407 heartbeat: Interval::new_at(
408 Instant::now() + config.heartbeat_initial_delay(),
409 config.heartbeat_interval(),
410 ),
411 heartbeat_ticks: 0,
412 px_peers: HashSet::new(),
413 outbound_peers: HashSet::new(),
414 peer_score: None,
415 count_received_ihave: HashMap::new(),
416 count_sent_iwant: HashMap::new(),
417 peer_protocols: HashMap::new(),
418 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
419 config,
420 subscription_filter,
421 data_transform,
422 })
423 }
424}
425
426impl<D, F> Gossipsub<D, F>
427where
428 D: DataTransform,
429 F: TopicSubscriptionFilter,
430{
431 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
433 self.mesh.keys()
434 }
435
436 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
438 self.mesh
439 .get(topic_hash)
440 .into_iter()
441 .map(|x| x.iter())
442 .flatten()
443 }
444
445 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
447 let mut res = BTreeSet::new();
448 for peers in self.mesh.values() {
449 res.extend(peers);
450 }
451 res.into_iter()
452 }
453
454 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
456 self.peer_topics
457 .iter()
458 .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
459 }
460
461 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
463 self.peer_protocols.iter()
464 }
465
466 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
468 self.peer_score
469 .as_ref()
470 .map(|(score, ..)| score.score(peer_id))
471 }
472
473 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
478 debug!("Subscribing to topic: {}", topic);
479 let topic_hash = topic.hash();
480 if !self.subscription_filter.can_subscribe(&topic_hash) {
481 return Err(SubscriptionError::NotAllowed);
482 }
483
484 if self.mesh.get(&topic_hash).is_some() {
485 debug!("Topic: {} is already in the mesh.", topic);
486 return Ok(false);
487 }
488
489 let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
491 if !peer_list.is_empty() {
492 let event = Arc::new(
493 GossipsubRpc {
494 messages: Vec::new(),
495 subscriptions: vec![GossipsubSubscription {
496 topic_hash: topic_hash.clone(),
497 action: GossipsubSubscriptionAction::Subscribe,
498 }],
499 control_msgs: Vec::new(),
500 }
501 .into_protobuf(),
502 );
503
504 for peer in peer_list {
505 debug!("Sending SUBSCRIBE to peer: {:?}", peer);
506 self.send_message(peer, event.clone())
507 .map_err(SubscriptionError::PublishError)?;
508 }
509 }
510
511 self.join(&topic_hash);
514 info!("Subscribed to topic: {}", topic);
515 Ok(true)
516 }
517
518 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
522 debug!("Unsubscribing from topic: {}", topic);
523 let topic_hash = topic.hash();
524
525 if self.mesh.get(&topic_hash).is_none() {
526 debug!("Already unsubscribed from topic: {:?}", topic_hash);
527 return Ok(false);
529 }
530
531 let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
533 if !peer_list.is_empty() {
534 let event = Arc::new(
535 GossipsubRpc {
536 messages: Vec::new(),
537 subscriptions: vec![GossipsubSubscription {
538 topic_hash: topic_hash.clone(),
539 action: GossipsubSubscriptionAction::Unsubscribe,
540 }],
541 control_msgs: Vec::new(),
542 }
543 .into_protobuf(),
544 );
545
546 for peer in peer_list {
547 debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
548 self.send_message(peer, event.clone())?;
549 }
550 }
551
552 self.leave(&topic_hash);
555
556 info!("Unsubscribed from topic: {:?}", topic_hash);
557 Ok(true)
558 }
559
560 pub fn publish<H: Hasher>(
562 &mut self,
563 topic: Topic<H>,
564 data: impl Into<Vec<u8>>,
565 ) -> Result<MessageId, PublishError> {
566 let data = data.into();
567
568 let transformed_data = self
570 .data_transform
571 .outbound_transform(&topic.hash(), data.clone())?;
572
573 let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
574
575 let msg_id = self.config.message_id(&GossipsubMessage {
577 source: raw_message.source,
578 data, sequence_number: raw_message.sequence_number,
580 topic: raw_message.topic.clone(),
581 });
582
583 let event = Arc::new(
584 GossipsubRpc {
585 subscriptions: Vec::new(),
586 messages: vec![raw_message.clone()],
587 control_msgs: Vec::new(),
588 }
589 .into_protobuf(),
590 );
591
592 if event.encoded_len() > self.config.max_transmit_size() {
594 return Err(PublishError::MessageTooLarge);
595 }
596
597 if self.duplicate_cache.contains(&msg_id) {
599 warn!(
602 "Not publishing a message that has already been published. Msg-id {}",
603 msg_id
604 );
605 return Err(PublishError::Duplicate);
606 }
607
608 debug!("Publishing message: {:?}", msg_id);
609
610 let topic_hash = raw_message.topic.clone();
611
612 let mesh_peers_sent =
614 !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
615
616 let mut recipient_peers = HashSet::new();
617 if let Some(set) = self.topic_peers.get(&topic_hash) {
618 if self.config.flood_publish() {
619 recipient_peers.extend(
621 set.iter()
622 .filter(|p| {
623 self.explicit_peers.contains(*p)
624 || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
625 })
626 .cloned(),
627 );
628 } else {
629 for peer in &self.explicit_peers {
631 if set.contains(peer) {
632 recipient_peers.insert(*peer);
633 }
634 }
635
636 for (peer, kind) in &self.peer_protocols {
638 if kind == &PeerKind::Floodsub
639 && !self
640 .score_below_threshold(peer, |ts| ts.publish_threshold)
641 .0
642 {
643 recipient_peers.insert(*peer);
644 }
645 }
646
647 if self.mesh.get(&topic_hash).is_none() {
649 debug!("Topic: {:?} not in the mesh", topic_hash);
650 if self.fanout.contains_key(&topic_hash) {
652 for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
653 recipient_peers.insert(*peer);
654 }
655 } else {
656 let mesh_n = self.config.mesh_n();
658 let new_peers = get_random_peers(
659 &self.topic_peers,
660 &self.peer_protocols,
661 &topic_hash,
662 mesh_n,
663 {
664 |p| {
665 !self.explicit_peers.contains(p)
666 && !self
667 .score_below_threshold(p, |pst| pst.publish_threshold)
668 .0
669 }
670 },
671 );
672 self.fanout.insert(topic_hash.clone(), new_peers.clone());
674 for peer in new_peers {
675 debug!("Peer added to fanout: {:?}", peer);
676 recipient_peers.insert(peer);
677 }
678 }
679 self.fanout_last_pub
681 .insert(topic_hash.clone(), Instant::now());
682 }
683 }
684 }
685
686 if recipient_peers.is_empty() && !mesh_peers_sent {
687 return Err(PublishError::InsufficientPeers);
688 }
689
690 self.duplicate_cache.insert(msg_id.clone());
693 self.mcache.put(&msg_id, raw_message);
694
695 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
698 if !self.config.allow_self_origin() {
699 self.published_message_ids.insert(msg_id.clone());
700 }
701 }
702
703 for peer_id in recipient_peers.iter() {
705 debug!("Sending message to peer: {:?}", peer_id);
706 self.send_message(*peer_id, event.clone())?;
707 }
708
709 info!("Published message: {:?}", &msg_id);
710 Ok(msg_id)
711 }
712
713 pub fn report_message_validation_result(
733 &mut self,
734 msg_id: &MessageId,
735 propagation_source: &PeerId,
736 acceptance: MessageAcceptance,
737 ) -> Result<bool, PublishError> {
738 let reject_reason = match acceptance {
739 MessageAcceptance::Accept => {
740 let raw_message = match self.mcache.validate(msg_id) {
741 Some(raw_message) => raw_message.clone(),
742 None => {
743 warn!(
744 "Message not in cache. Ignoring forwarding. Message Id: {}",
745 msg_id
746 );
747 return Ok(false);
748 }
749 };
750 self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
751 return Ok(true);
752 }
753 MessageAcceptance::Reject => RejectReason::ValidationFailed,
754 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
755 };
756
757 if let Some(raw_message) = self.mcache.remove(msg_id) {
758 if let Some((peer_score, ..)) = &mut self.peer_score {
760 peer_score.reject_message(
761 propagation_source,
762 msg_id,
763 &raw_message.topic,
764 reject_reason,
765 );
766 }
767 Ok(true)
768 } else {
769 warn!("Rejected message not in cache. Message Id: {}", msg_id);
770 Ok(false)
771 }
772 }
773
774 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
776 debug!("Adding explicit peer {}", peer_id);
777
778 self.explicit_peers.insert(*peer_id);
779
780 self.check_explicit_peer_connection(peer_id);
781 }
782
783 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
786 debug!("Removing explicit peer {}", peer_id);
787 self.explicit_peers.remove(peer_id);
788 }
789
790 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
793 if self.blacklisted_peers.insert(*peer_id) {
794 debug!("Peer has been blacklisted: {}", peer_id);
795 }
796 }
797
798 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
800 if self.blacklisted_peers.remove(peer_id) {
801 debug!("Peer has been removed from the blacklist: {}", peer_id);
802 }
803 }
804
805 pub fn with_peer_score(
809 &mut self,
810 params: PeerScoreParams,
811 threshold: PeerScoreThresholds,
812 ) -> Result<(), String> {
813 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
814 }
815
816 pub fn with_peer_score_and_message_delivery_time_callback(
819 &mut self,
820 params: PeerScoreParams,
821 threshold: PeerScoreThresholds,
822 callback: Option<fn(&PeerId, &TopicHash, f64)>,
823 ) -> Result<(), String> {
824 params.validate()?;
825 threshold.validate()?;
826
827 if self.peer_score.is_some() {
828 return Err("Peer score set twice".into());
829 }
830
831 let interval = Interval::new(params.decay_interval);
832 let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
833 self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
834 Ok(())
835 }
836
837 pub fn set_topic_params<H: Hasher>(
841 &mut self,
842 topic: Topic<H>,
843 params: TopicScoreParams,
844 ) -> Result<(), &'static str> {
845 if let Some((peer_score, ..)) = &mut self.peer_score {
846 peer_score.set_topic_params(topic.hash(), params);
847 Ok(())
848 } else {
849 Err("Peer score must be initialised with `with_peer_score()`")
850 }
851 }
852
853 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
856 if let Some((peer_score, ..)) = &mut self.peer_score {
857 peer_score.set_application_score(peer_id, new_score)
858 } else {
859 false
860 }
861 }
862
863 fn join(&mut self, topic_hash: &TopicHash) {
865 debug!("Running JOIN for topic: {:?}", topic_hash);
866
867 if self.mesh.contains_key(topic_hash) {
869 info!("JOIN: The topic is already in the mesh, ignoring JOIN");
870 return;
871 }
872
873 let mut added_peers = HashSet::new();
874
875 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
878 debug!(
879 "JOIN: Removing peers from the fanout for topic: {:?}",
880 topic_hash
881 );
882
883 peers = peers
885 .into_iter()
886 .filter(|p| {
887 !self.explicit_peers.contains(p)
888 && !self.score_below_threshold(p, |_| 0.0).0
889 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
890 })
891 .collect();
892
893 let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
896 debug!(
897 "JOIN: Adding {:?} peers from the fanout for topic: {:?}",
898 add_peers, topic_hash
899 );
900 added_peers.extend(peers.iter().cloned().take(add_peers));
901 self.mesh.insert(
902 topic_hash.clone(),
903 peers.into_iter().take(add_peers).collect(),
904 );
905 self.fanout_last_pub.remove(topic_hash);
907 }
908
909 if added_peers.len() < self.config.mesh_n() {
911 let new_peers = get_random_peers(
913 &self.topic_peers,
914 &self.peer_protocols,
915 topic_hash,
916 self.config.mesh_n() - added_peers.len(),
917 |peer| {
918 !added_peers.contains(peer)
919 && !self.explicit_peers.contains(peer)
920 && !self.score_below_threshold(peer, |_| 0.0).0
921 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
922 },
923 );
924 added_peers.extend(new_peers.clone());
925 debug!(
927 "JOIN: Inserting {:?} random peers into the mesh",
928 new_peers.len()
929 );
930 let mesh_peers = self
931 .mesh
932 .entry(topic_hash.clone())
933 .or_insert_with(Default::default);
934 mesh_peers.extend(new_peers);
935 }
936
937 for peer_id in added_peers {
938 info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
940 if let Some((peer_score, ..)) = &mut self.peer_score {
941 peer_score.graft(&peer_id, topic_hash.clone());
942 }
943 Self::control_pool_add(
944 &mut self.control_pool,
945 peer_id,
946 GossipsubControlAction::Graft {
947 topic_hash: topic_hash.clone(),
948 },
949 );
950 }
951 debug!("Completed JOIN for topic: {:?}", topic_hash);
952 }
953
954 fn make_prune(
956 &mut self,
957 topic_hash: &TopicHash,
958 peer: &PeerId,
959 do_px: bool,
960 ) -> GossipsubControlAction {
961 if let Some((peer_score, ..)) = &mut self.peer_score {
962 peer_score.prune(peer, topic_hash.clone());
963 }
964
965 match self.peer_protocols.get(peer) {
966 Some(PeerKind::Floodsub) => {
967 error!("Attempted to prune a Floodsub peer");
968 }
969 Some(PeerKind::Gossipsub) => {
970 return GossipsubControlAction::Prune {
972 topic_hash: topic_hash.clone(),
973 peers: Vec::new(),
974 backoff: None,
975 };
976 }
977 None => {
978 error!("Attempted to Prune an unknown peer");
979 }
980 _ => {} }
982
983 let peers = if do_px {
985 get_random_peers(
986 &self.topic_peers,
987 &self.peer_protocols,
988 &topic_hash,
989 self.config.prune_peers(),
990 |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
991 )
992 .into_iter()
993 .map(|p| PeerInfo { peer_id: Some(p) })
994 .collect()
995 } else {
996 Vec::new()
997 };
998
999 self.backoffs
1001 .update_backoff(topic_hash, peer, self.config.prune_backoff());
1002
1003 GossipsubControlAction::Prune {
1004 topic_hash: topic_hash.clone(),
1005 peers,
1006 backoff: Some(self.config.prune_backoff().as_secs()),
1007 }
1008 }
1009
1010 fn leave(&mut self, topic_hash: &TopicHash) {
1012 debug!("Running LEAVE for topic {:?}", topic_hash);
1013
1014 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1016 for peer in peers {
1017 info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
1019 let control = self.make_prune(topic_hash, &peer, self.config.do_px());
1020 Self::control_pool_add(&mut self.control_pool, peer, control);
1021 }
1022 }
1023 debug!("Completed LEAVE for topic: {:?}", topic_hash);
1024 }
1025
1026 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1028 if !self.peer_topics.contains_key(peer_id) {
1029 debug!("Connecting to explicit peer {:?}", peer_id);
1031 self.events.push_back(NetworkBehaviourAction::DialPeer {
1032 peer_id: *peer_id,
1033 condition: DialPeerCondition::Disconnected,
1034 });
1035 }
1036 }
1037
1038 fn score_below_threshold(
1041 &self,
1042 peer_id: &PeerId,
1043 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1044 ) -> (bool, f64) {
1045 Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1046 }
1047
1048 fn score_below_threshold_from_scores(
1049 peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
1050 peer_id: &PeerId,
1051 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1052 ) -> (bool, f64) {
1053 if let Some((peer_score, thresholds, ..)) = peer_score {
1054 let score = peer_score.score(peer_id);
1055 if score < threshold(thresholds) {
1056 return (true, score);
1057 }
1058 (false, score)
1059 } else {
1060 (false, 0.0)
1061 }
1062 }
1063
1064 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1067 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1069 debug!(
1070 "IHAVE: ignoring peer {:?} with score below threshold [score = {}]",
1071 peer_id, score
1072 );
1073 return;
1074 }
1075
1076 let peer_have = self
1078 .count_received_ihave
1079 .entry(*peer_id)
1080 .or_insert(0);
1081 *peer_have += 1;
1082 if *peer_have > self.config.max_ihave_messages() {
1083 debug!(
1084 "IHAVE: peer {} has advertised too many times ({}) within this heartbeat \
1085 interval; ignoring",
1086 peer_id, *peer_have
1087 );
1088 return;
1089 }
1090
1091 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1092 if *iasked >= self.config.max_ihave_length() {
1093 debug!(
1094 "IHAVE: peer {} has already advertised too many messages ({}); ignoring",
1095 peer_id, *iasked
1096 );
1097 return;
1098 }
1099 }
1100
1101 debug!("Handling IHAVE for peer: {:?}", peer_id);
1102
1103 let mut iwant_ids = HashSet::new();
1105
1106 for (topic, ids) in ihave_msgs {
1107 if !self.mesh.contains_key(&topic) {
1109 debug!(
1110 "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
1111 topic
1112 );
1113 continue;
1114 }
1115
1116 for id in ids {
1117 if !self.duplicate_cache.contains(&id) {
1118 iwant_ids.insert(id);
1120 }
1121 }
1122 }
1123
1124 if !iwant_ids.is_empty() {
1125 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1126 let mut iask = iwant_ids.len();
1127 if *iasked + iask > self.config.max_ihave_length() {
1128 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1129 }
1130
1131 debug!(
1133 "IHAVE: Asking for {} out of {} messages from {}",
1134 iask,
1135 iwant_ids.len(),
1136 peer_id
1137 );
1138
1139 let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
1141 let mut rng = thread_rng();
1142 iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
1143
1144 iwant_ids_vec.truncate(iask as usize);
1145 *iasked += iask;
1146
1147 let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
1148 if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1149 gossip_promises.add_promise(
1150 *peer_id,
1151 &message_ids,
1152 Instant::now() + self.config.iwant_followup_time(),
1153 );
1154 }
1155 debug!(
1156 "IHAVE: Asking for the following messages from {}: {:?}",
1157 peer_id, message_ids
1158 );
1159
1160 Self::control_pool_add(
1161 &mut self.control_pool,
1162 *peer_id,
1163 GossipsubControlAction::IWant { message_ids },
1164 );
1165 }
1166 debug!("Completed IHAVE handling for peer: {:?}", peer_id);
1167 }
1168
1169 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1172 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1174 debug!(
1175 "IWANT: ignoring peer {:?} with score below threshold [score = {}]",
1176 peer_id, score
1177 );
1178 return;
1179 }
1180
1181 debug!("Handling IWANT for peer: {:?}", peer_id);
1182 let mut cached_messages = HashMap::new();
1184
1185 for id in iwant_msgs {
1186 if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
1189 if count > self.config.gossip_retransimission() {
1190 debug!(
1191 "IWANT: Peer {} has asked for message {} too many times; ignoring \
1192 request",
1193 peer_id, &id
1194 );
1195 } else {
1196 cached_messages.insert(id.clone(), msg.clone());
1197 }
1198 }
1199 }
1200
1201 if !cached_messages.is_empty() {
1202 debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
1203 let message_list = cached_messages
1205 .into_iter()
1206 .map(|entry| entry.1)
1207 .collect();
1208 if self
1209 .send_message(
1210 *peer_id,
1211 GossipsubRpc {
1212 subscriptions: Vec::new(),
1213 messages: message_list,
1214 control_msgs: Vec::new(),
1215 }
1216 .into_protobuf(),
1217 )
1218 .is_err()
1219 {
1220 error!("Failed to send cached messages. Messages too large");
1221 }
1222 }
1223 debug!("Completed IWANT handling for peer: {}", peer_id);
1224 }
1225
1226 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1229 debug!("Handling GRAFT message for peer: {}", peer_id);
1230
1231 let mut to_prune_topics = HashSet::new();
1232
1233 let mut do_px = self.config.do_px();
1234
1235 if self.explicit_peers.contains(peer_id) {
1237 warn!("GRAFT: ignoring request from direct peer {}", peer_id);
1238 to_prune_topics = HashSet::from_iter(topics.into_iter());
1240 do_px = false
1242 } else {
1243 let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1244 let now = Instant::now();
1245 for topic_hash in topics {
1246 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1247 if peers.contains(peer_id) {
1249 debug!(
1250 "GRAFT: Received graft for peer {:?} that is already in topic {:?}",
1251 peer_id, &topic_hash
1252 );
1253 continue;
1254 }
1255
1256 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1258 {
1259 if backoff_time > now {
1260 warn!(
1261 "GRAFT: peer attempted graft within backoff time, penalizing {}",
1262 peer_id
1263 );
1264 if let Some((peer_score, ..)) = &mut self.peer_score {
1266 peer_score.add_penalty(peer_id, 1);
1267
1268 let flood_cutoff = (backoff_time
1270 + self.config.graft_flood_threshold())
1271 - self.config.prune_backoff();
1272 if flood_cutoff > now {
1273 peer_score.add_penalty(peer_id, 1);
1275 }
1276 }
1277 do_px = false;
1279
1280 to_prune_topics.insert(topic_hash.clone());
1281 continue;
1282 }
1283 }
1284
1285 if below_zero {
1287 debug!(
1289 "GRAFT: ignoring peer {:?} with negative score [score = {}, \
1290 topic = {}]",
1291 peer_id, score, topic_hash
1292 );
1293 to_prune_topics.insert(topic_hash.clone());
1295 do_px = false;
1297 continue;
1298 }
1299
1300 if peers.len() >= self.config.mesh_n_high()
1303 && !self.outbound_peers.contains(peer_id)
1304 {
1305 to_prune_topics.insert(topic_hash.clone());
1306 continue;
1307 }
1308
1309 info!(
1311 "GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
1312 peer_id, &topic_hash
1313 );
1314 peers.insert(*peer_id);
1315
1316 if let Some((peer_score, ..)) = &mut self.peer_score {
1317 peer_score.graft(peer_id, topic_hash);
1318 }
1319 } else {
1320 do_px = false;
1322 debug!(
1323 "GRAFT: Received graft for unknown topic {:?} from peer {:?}",
1324 &topic_hash, peer_id
1325 );
1326 continue;
1328 }
1329 }
1330 }
1331
1332 if !to_prune_topics.is_empty() {
1333 let prune_messages = to_prune_topics
1335 .iter()
1336 .map(|t| self.make_prune(t, peer_id, do_px))
1337 .collect();
1338 info!(
1340 "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {}",
1341 peer_id
1342 );
1343
1344 if self
1345 .send_message(
1346 *peer_id,
1347 GossipsubRpc {
1348 subscriptions: Vec::new(),
1349 messages: Vec::new(),
1350 control_msgs: prune_messages,
1351 }
1352 .into_protobuf(),
1353 )
1354 .is_err()
1355 {
1356 error!("Failed to send graft. Message too large");
1357 }
1358 }
1359 debug!("Completed GRAFT handling for peer: {}", peer_id);
1360 }
1361
1362 fn remove_peer_from_mesh(
1363 &mut self,
1364 peer_id: &PeerId,
1365 topic_hash: &TopicHash,
1366 backoff: Option<u64>,
1367 always_update_backoff: bool,
1368 ) {
1369 let mut update_backoff = always_update_backoff;
1370 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1371 if peers.remove(peer_id) {
1373 info!(
1374 "PRUNE: Removing peer: {} from the mesh for topic: {}",
1375 peer_id.to_string(),
1376 topic_hash
1377 );
1378
1379 if let Some((peer_score, ..)) = &mut self.peer_score {
1380 peer_score.prune(peer_id, topic_hash.clone());
1381 }
1382
1383 update_backoff = true;
1384 }
1385 }
1386 if update_backoff {
1387 let time = if let Some(backoff) = backoff {
1388 Duration::from_secs(backoff)
1389 } else {
1390 self.config.prune_backoff()
1391 };
1392 self.backoffs.update_backoff(&topic_hash, peer_id, time);
1394 }
1395 }
1396
1397 fn handle_prune(
1399 &mut self,
1400 peer_id: &PeerId,
1401 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1402 ) {
1403 debug!("Handling PRUNE message for peer: {}", peer_id);
1404 let (below_threshold, score) =
1405 self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1406 for (topic_hash, px, backoff) in prune_data {
1407 self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
1408
1409 if self.mesh.contains_key(&topic_hash) {
1410 if !px.is_empty() {
1412 if below_threshold {
1414 debug!(
1415 "PRUNE: ignoring PX from peer {:?} with insufficient score \
1416 [score ={} topic = {}]",
1417 peer_id, score, topic_hash
1418 );
1419 continue;
1420 }
1421
1422 if self.config.prune_peers() > 0 {
1429 self.px_connect(px);
1430 }
1431 }
1432 }
1433 }
1434 debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
1435 }
1436
1437 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1438 let n = self.config.prune_peers();
1439 px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
1444 if px.len() > n {
1445 let mut rng = thread_rng();
1447 px.partial_shuffle(&mut rng, n);
1448 px = px.into_iter().take(n).collect();
1449 }
1450
1451 for p in px {
1452 if let Some(peer_id) = p.peer_id {
1455 self.px_peers.insert(peer_id);
1457
1458 self.events.push_back(NetworkBehaviourAction::DialPeer {
1460 peer_id,
1461 condition: DialPeerCondition::Disconnected,
1462 });
1463 }
1464 }
1465 }
1466
1467 fn message_is_valid(
1470 &mut self,
1471 msg_id: &MessageId,
1472 raw_message: &mut RawGossipsubMessage,
1473 propagation_source: &PeerId,
1474 ) -> bool {
1475 debug!(
1476 "Handling message: {:?} from peer: {}",
1477 msg_id,
1478 propagation_source.to_string()
1479 );
1480
1481 if self.blacklisted_peers.contains(propagation_source) {
1483 debug!(
1484 "Rejecting message from blacklisted peer: {}",
1485 propagation_source
1486 );
1487 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1488 peer_score.reject_message(
1489 propagation_source,
1490 msg_id,
1491 &raw_message.topic,
1492 RejectReason::BlackListedPeer,
1493 );
1494 gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1495 }
1496 return false;
1497 }
1498
1499 if let Some(source) = raw_message.source.as_ref() {
1501 if self.blacklisted_peers.contains(source) {
1502 debug!(
1503 "Rejecting message from peer {} because of blacklisted source: {}",
1504 propagation_source, source
1505 );
1506 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1507 peer_score.reject_message(
1508 propagation_source,
1509 msg_id,
1510 &raw_message.topic,
1511 RejectReason::BlackListedSource,
1512 );
1513 gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
1514 }
1515 return false;
1516 }
1517 }
1518
1519 if !self.config.validate_messages() {
1523 raw_message.validated = true;
1524 }
1525
1526 let self_published = !self.config.allow_self_origin()
1528 && if let Some(own_id) = self.publish_config.get_own_id() {
1529 own_id != propagation_source
1530 && raw_message.source.as_ref().map_or(false, |s| s == own_id)
1531 } else {
1532 self.published_message_ids.contains(&msg_id)
1533 };
1534
1535 if self_published {
1536 debug!(
1537 "Dropping message {} claiming to be from self but forwarded from {}",
1538 msg_id, propagation_source
1539 );
1540 if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
1541 peer_score.reject_message(
1542 propagation_source,
1543 msg_id,
1544 &raw_message.topic,
1545 RejectReason::SelfOrigin,
1546 );
1547 gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
1548 }
1549 return false;
1550 }
1551
1552 true
1553 }
1554
1555 fn handle_received_message(
1559 &mut self,
1560 mut raw_message: RawGossipsubMessage,
1561 propagation_source: &PeerId,
1562 ) {
1563 let fast_message_id = self.config.fast_message_id(&raw_message);
1564 if let Some(fast_message_id) = fast_message_id.as_ref() {
1565 if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
1566 let msg_id = msg_id.clone();
1567 self.message_is_valid(&msg_id, &mut raw_message, propagation_source);
1568 if let Some((peer_score, ..)) = &mut self.peer_score {
1569 peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
1570 }
1571 return;
1572 }
1573 }
1574
1575 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1577 Ok(message) => message,
1578 Err(e) => {
1579 debug!("Invalid message. Transform error: {:?}", e);
1580 self.handle_invalid_message(
1582 propagation_source,
1583 raw_message,
1584 ValidationError::TransformFailed,
1585 );
1586 return;
1587 }
1588 };
1589
1590 let msg_id = self.config.message_id(&message);
1592
1593 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1597 return;
1598 }
1599
1600 if let Some(fast_message_id) = fast_message_id {
1602 self.fast_messsage_id_cache
1604 .entry(fast_message_id)
1605 .or_insert_with(|| msg_id.clone());
1606 }
1607 if !self.duplicate_cache.insert(msg_id.clone()) {
1608 debug!(
1609 "Message already received, ignoring. Message: {}",
1610 msg_id
1611 );
1612 if let Some((peer_score, ..)) = &mut self.peer_score {
1613 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1614 }
1615 return;
1616 }
1617 debug!(
1618 "Put message {:?} in duplicate_cache and resolve promises",
1619 msg_id
1620 );
1621
1622 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1625 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1626 gossip_promises.message_delivered(&msg_id);
1627 }
1628
1629 self.mcache.put(&msg_id, raw_message.clone());
1631
1632 if self.mesh.contains_key(&message.topic) {
1634 debug!("Sending received message to user");
1635 self.events.push_back(NetworkBehaviourAction::GenerateEvent(
1636 GossipsubEvent::Message {
1637 propagation_source: *propagation_source,
1638 message_id: msg_id.clone(),
1639 message,
1640 },
1641 ));
1642 } else {
1643 debug!(
1644 "Received message on a topic we are not subscribed to: {:?}",
1645 message.topic
1646 );
1647 return;
1648 }
1649
1650 if !self.config.validate_messages() {
1652 if self
1653 .forward_msg(&msg_id, raw_message, Some(propagation_source))
1654 .is_err()
1655 {
1656 error!("Failed to forward message. Too large");
1657 }
1658 debug!("Completed message handling for message: {:?}", msg_id);
1659 }
1660 }
1661
1662 fn handle_invalid_message(
1664 &mut self,
1665 propagation_source: &PeerId,
1666 raw_message: RawGossipsubMessage,
1667 validation_error: ValidationError,
1668 ) {
1669 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1670 let reason = RejectReason::ValidationError(validation_error);
1671 let fast_message_id_cache = &self.fast_messsage_id_cache;
1672 if let Some(msg_id) = self
1673 .config
1674 .fast_message_id(&raw_message)
1675 .and_then(|id| fast_message_id_cache.get(&id))
1676 {
1677 peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
1678 gossip_promises.reject_message(msg_id, &reason);
1679 } else {
1680 peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1684 }
1685 }
1686 }
1687
1688 fn handle_received_subscriptions(
1690 &mut self,
1691 subscriptions: &[GossipsubSubscription],
1692 propagation_source: &PeerId,
1693 ) {
1694 debug!(
1695 "Handling subscriptions: {:?}, from source: {}",
1696 subscriptions,
1697 propagation_source.to_string()
1698 );
1699
1700 let mut unsubscribed_peers = Vec::new();
1701
1702 let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
1703 Some(topics) => topics,
1704 None => {
1705 error!(
1706 "Subscription by unknown peer: {}",
1707 propagation_source.to_string()
1708 );
1709 return;
1710 }
1711 };
1712
1713 let mut grafts = Vec::new();
1715
1716 let mut application_event = Vec::new();
1718
1719 let filtered_topics = match self
1720 .subscription_filter
1721 .filter_incoming_subscriptions(subscriptions, subscribed_topics)
1722 {
1723 Ok(topics) => topics,
1724 Err(s) => {
1725 error!(
1726 "Subscription filter error: {}; ignoring RPC from peer {}",
1727 s,
1728 propagation_source.to_string()
1729 );
1730 return;
1731 }
1732 };
1733
1734 for subscription in filtered_topics {
1735 let peer_list = self
1737 .topic_peers
1738 .entry(subscription.topic_hash.clone())
1739 .or_insert_with(Default::default);
1740
1741 match subscription.action {
1742 GossipsubSubscriptionAction::Subscribe => {
1743 if peer_list.insert(*propagation_source) {
1744 debug!(
1745 "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
1746 propagation_source.to_string(),
1747 subscription.topic_hash
1748 );
1749 }
1750
1751 subscribed_topics.insert(subscription.topic_hash.clone());
1753
1754 if !self.explicit_peers.contains(propagation_source)
1756 && match self.peer_protocols.get(propagation_source) {
1757 Some(PeerKind::Gossipsubv1_1) => true,
1758 Some(PeerKind::Gossipsub) => true,
1759 _ => false,
1760 }
1761 && !Self::score_below_threshold_from_scores(
1762 &self.peer_score,
1763 propagation_source,
1764 |_| 0.0,
1765 )
1766 .0
1767 && !self
1768 .backoffs
1769 .is_backoff_with_slack(&subscription.topic_hash, propagation_source)
1770 {
1771 if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
1772 if peers.len() < self.config.mesh_n_low()
1773 && peers.insert(*propagation_source)
1774 {
1775 debug!(
1776 "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
1777 propagation_source.to_string(),
1778 subscription.topic_hash
1779 );
1780 debug!(
1782 "Sending GRAFT to peer {} for topic {:?}",
1783 propagation_source.to_string(),
1784 subscription.topic_hash
1785 );
1786 if let Some((peer_score, ..)) = &mut self.peer_score {
1787 peer_score
1788 .graft(propagation_source, subscription.topic_hash.clone());
1789 }
1790 grafts.push(GossipsubControlAction::Graft {
1791 topic_hash: subscription.topic_hash.clone(),
1792 });
1793 }
1794 }
1795 }
1796 application_event.push(NetworkBehaviourAction::GenerateEvent(
1798 GossipsubEvent::Subscribed {
1799 peer_id: *propagation_source,
1800 topic: subscription.topic_hash.clone(),
1801 },
1802 ));
1803 }
1804 GossipsubSubscriptionAction::Unsubscribe => {
1805 if peer_list.remove(propagation_source) {
1806 info!(
1807 "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
1808 propagation_source.to_string(),
1809 subscription.topic_hash
1810 );
1811 }
1812 subscribed_topics.remove(&subscription.topic_hash);
1814 unsubscribed_peers
1815 .push((*propagation_source, subscription.topic_hash.clone()));
1816 application_event.push(NetworkBehaviourAction::GenerateEvent(
1818 GossipsubEvent::Unsubscribed {
1819 peer_id: *propagation_source,
1820 topic: subscription.topic_hash.clone(),
1821 },
1822 ));
1823 }
1824 }
1825 }
1826
1827 for (peer_id, topic_hash) in unsubscribed_peers {
1829 self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
1830 }
1831
1832 if !grafts.is_empty()
1835 && self
1836 .send_message(
1837 *propagation_source,
1838 GossipsubRpc {
1839 subscriptions: Vec::new(),
1840 messages: Vec::new(),
1841 control_msgs: grafts,
1842 }
1843 .into_protobuf(),
1844 )
1845 .is_err()
1846 {
1847 error!("Failed sending grafts. Message too large");
1848 }
1849
1850 for event in application_event {
1852 self.events.push_back(event);
1853 }
1854
1855 trace!(
1856 "Completed handling subscriptions from source: {:?}",
1857 propagation_source
1858 );
1859 }
1860
1861 fn apply_iwant_penalties(&mut self) {
1863 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1864 for (peer, count) in gossip_promises.get_broken_promises() {
1865 peer_score.add_penalty(&peer, count);
1866 }
1867 }
1868 }
1869
1870 fn heartbeat(&mut self) {
1872 debug!("Starting heartbeat");
1873
1874 self.heartbeat_ticks += 1;
1875
1876 let mut to_graft = HashMap::new();
1877 let mut to_prune = HashMap::new();
1878 let mut no_px = HashSet::new();
1879
1880 self.backoffs.heartbeat();
1882
1883 self.count_sent_iwant.clear();
1885 self.count_received_ihave.clear();
1886
1887 self.apply_iwant_penalties();
1889
1890 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
1892 for p in self.explicit_peers.clone() {
1893 self.check_explicit_peer_connection(&p);
1894 }
1895 }
1896
1897 let mut scores = HashMap::new();
1899 let peer_score = &self.peer_score;
1900 let mut score = |p: &PeerId| match peer_score {
1901 Some((peer_score, ..)) => *scores
1902 .entry(*p)
1903 .or_insert_with(|| peer_score.score(p)),
1904 _ => 0.0,
1905 };
1906
1907 for (topic_hash, peers) in self.mesh.iter_mut() {
1909 let explicit_peers = &self.explicit_peers;
1910 let backoffs = &self.backoffs;
1911 let topic_peers = &self.topic_peers;
1912 let outbound_peers = &self.outbound_peers;
1913
1914 let to_remove: Vec<_> = peers
1918 .iter()
1919 .filter(|&p| {
1920 if score(p) < 0.0 {
1921 debug!(
1922 "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
1923 {}]",
1924 p,
1925 score(p),
1926 topic_hash
1927 );
1928
1929 let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
1930 current_topic.push(topic_hash.clone());
1931 no_px.insert(*p);
1932 true
1933 } else {
1934 false
1935 }
1936 })
1937 .cloned()
1938 .collect();
1939 for peer in to_remove {
1940 peers.remove(&peer);
1941 }
1942
1943 if peers.len() < self.config.mesh_n_low() {
1945 debug!(
1946 "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
1947 topic_hash,
1948 peers.len(),
1949 self.config.mesh_n_low()
1950 );
1951 let desired_peers = self.config.mesh_n() - peers.len();
1953 let peer_list = get_random_peers(
1954 topic_peers,
1955 &self.peer_protocols,
1956 topic_hash,
1957 desired_peers,
1958 |peer| {
1959 !peers.contains(peer)
1960 && !explicit_peers.contains(peer)
1961 && !backoffs.is_backoff_with_slack(topic_hash, peer)
1962 && score(peer) >= 0.0
1963 },
1964 );
1965 for peer in &peer_list {
1966 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
1967 current_topic.push(topic_hash.clone());
1968 }
1969 debug!("Updating mesh, new mesh: {:?}", peer_list);
1971 peers.extend(peer_list);
1972 }
1973
1974 if peers.len() > self.config.mesh_n_high() {
1976 debug!(
1977 "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
1978 topic_hash,
1979 peers.len(),
1980 self.config.mesh_n_high()
1981 );
1982 let excess_peer_no = peers.len() - self.config.mesh_n();
1983
1984 let mut rng = thread_rng();
1986 let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
1987 shuffled.shuffle(&mut rng);
1988 shuffled
1989 .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
1990 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
1992
1993 let mut outbound = {
1995 let outbound_peers = &self.outbound_peers;
1996 shuffled
1997 .iter()
1998 .filter(|p| outbound_peers.contains(*p))
1999 .count()
2000 };
2001
2002 let mut removed = 0;
2005 for peer in shuffled {
2006 if removed == excess_peer_no {
2007 break;
2008 }
2009 if self.outbound_peers.contains(&peer) {
2010 if outbound <= self.config.mesh_outbound_min() {
2011 continue;
2013 } else {
2014 outbound -= 1;
2016 }
2017 }
2018
2019 peers.remove(&peer);
2021 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2022 current_topic.push(topic_hash.clone());
2023 removed += 1;
2024 }
2025 }
2026
2027 if peers.len() >= self.config.mesh_n_low() {
2029 let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2031
2032 if outbound < self.config.mesh_outbound_min() {
2034 let needed = self.config.mesh_outbound_min() - outbound;
2035 let peer_list = get_random_peers(
2036 topic_peers,
2037 &self.peer_protocols,
2038 topic_hash,
2039 needed,
2040 |peer| {
2041 !peers.contains(peer)
2042 && !explicit_peers.contains(peer)
2043 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2044 && score(peer) >= 0.0
2045 && outbound_peers.contains(peer)
2046 },
2047 );
2048 for peer in &peer_list {
2049 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2050 current_topic.push(topic_hash.clone());
2051 }
2052 debug!("Updating mesh, new mesh: {:?}", peer_list);
2054 peers.extend(peer_list);
2055 }
2056 }
2057
2058 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2060 && peers.len() > 1
2061 && self.peer_score.is_some()
2062 {
2063 if let Some((_, thresholds, _, _)) = &self.peer_score {
2064 let mut peers_by_score: Vec<_> = peers.iter().collect();
2074 peers_by_score
2075 .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
2076
2077 let middle = peers_by_score.len() / 2;
2078 let median = if peers_by_score.len() % 2 == 0 {
2079 (score(
2080 *peers_by_score.get(middle - 1).expect(
2081 "middle < vector length and middle > 0 since peers.len() > 0",
2082 ),
2083 ) + score(*peers_by_score.get(middle).expect("middle < vector length")))
2084 * 0.5
2085 } else {
2086 score(*peers_by_score.get(middle).expect("middle < vector length"))
2087 };
2088
2089 if median < thresholds.opportunistic_graft_threshold {
2092 let peer_list = get_random_peers(
2093 topic_peers,
2094 &self.peer_protocols,
2095 topic_hash,
2096 self.config.opportunistic_graft_peers(),
2097 |peer| {
2098 !peers.contains(peer)
2099 && !explicit_peers.contains(peer)
2100 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2101 && score(peer) > median
2102 },
2103 );
2104 for peer in &peer_list {
2105 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2106 current_topic.push(topic_hash.clone());
2107 }
2108 debug!(
2110 "Opportunistically graft in topic {} with peers {:?}",
2111 topic_hash, peer_list
2112 );
2113 peers.extend(peer_list);
2114 }
2115 }
2116 }
2117 }
2118
2119 {
2121 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2123 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2124 if *last_pub_time + fanout_ttl < Instant::now() {
2125 debug!(
2126 "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
2127 topic_hash
2128 );
2129 fanout.remove(&topic_hash);
2130 return false;
2131 }
2132 true
2133 });
2134 }
2135
2136 for (topic_hash, peers) in self.fanout.iter_mut() {
2139 let mut to_remove_peers = Vec::new();
2140 let publish_threshold = match &self.peer_score {
2141 Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2142 _ => 0.0,
2143 };
2144 for peer in peers.iter() {
2145 match self.peer_topics.get(peer) {
2147 Some(topics) => {
2148 if !topics.contains(&topic_hash) || score(peer) < publish_threshold {
2149 debug!(
2150 "HEARTBEAT: Peer removed from fanout for topic: {:?}",
2151 topic_hash
2152 );
2153 to_remove_peers.push(*peer);
2154 }
2155 }
2156 None => {
2157 to_remove_peers.push(*peer);
2159 }
2160 }
2161 }
2162 for to_remove in to_remove_peers {
2163 peers.remove(&to_remove);
2164 }
2165
2166 if peers.len() < self.config.mesh_n() {
2168 debug!(
2169 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2170 peers.len(),
2171 self.config.mesh_n()
2172 );
2173 let needed_peers = self.config.mesh_n() - peers.len();
2174 let explicit_peers = &self.explicit_peers;
2175 let new_peers = get_random_peers(
2176 &self.topic_peers,
2177 &self.peer_protocols,
2178 topic_hash,
2179 needed_peers,
2180 |peer| {
2181 !peers.contains(peer)
2182 && !explicit_peers.contains(peer)
2183 && score(peer) < publish_threshold
2184 },
2185 );
2186 peers.extend(new_peers);
2187 }
2188 }
2189
2190 if self.peer_score.is_some() {
2191 trace!("Peer_scores: {:?}", {
2192 for peer in self.peer_topics.keys() {
2193 score(peer);
2194 }
2195 scores
2196 });
2197 trace!("Mesh message deliveries: {:?}", {
2198 self.mesh
2199 .iter()
2200 .map(|(t, peers)| {
2201 (
2202 t.clone(),
2203 peers
2204 .iter()
2205 .map(|p| {
2206 (
2207 *p,
2208 peer_score
2209 .as_ref()
2210 .expect("peer_score.is_some()")
2211 .0
2212 .mesh_message_deliveries(p, t)
2213 .unwrap_or(0.0),
2214 )
2215 })
2216 .collect::<HashMap<PeerId, f64>>(),
2217 )
2218 })
2219 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2220 })
2221 }
2222
2223 self.emit_gossip();
2224
2225 if !to_graft.is_empty() | !to_prune.is_empty() {
2227 self.send_graft_prune(to_graft, to_prune, no_px);
2228 }
2229
2230 self.flush_control_pool();
2232
2233 self.mcache.shift();
2235
2236 debug!("Completed Heartbeat");
2237 }
2238
2239 fn emit_gossip(&mut self) {
2242 let mut rng = thread_rng();
2243 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2244 let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash);
2245 if message_ids.is_empty() {
2246 return;
2247 }
2248
2249 if message_ids.len() > self.config.max_ihave_length() {
2251 debug!(
2253 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2254 message_ids.len()
2255 );
2256 } else {
2257 message_ids.shuffle(&mut rng);
2259 }
2260
2261 let n_map = |m| {
2263 max(
2264 self.config.gossip_lazy(),
2265 (self.config.gossip_factor() * m as f64) as usize,
2266 )
2267 };
2268 let to_msg_peers = get_random_peers_dynamic(
2270 &self.topic_peers,
2271 &self.peer_protocols,
2272 &topic_hash,
2273 n_map,
2274 |peer| {
2275 !peers.contains(peer)
2276 && !self.explicit_peers.contains(peer)
2277 && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2278 },
2279 );
2280
2281 debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
2282
2283 for peer in to_msg_peers {
2284 let mut peer_message_ids = message_ids.clone();
2285
2286 if peer_message_ids.len() > self.config.max_ihave_length() {
2287 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2291 peer_message_ids.truncate(self.config.max_ihave_length());
2292 }
2293
2294 Self::control_pool_add(
2296 &mut self.control_pool,
2297 peer,
2298 GossipsubControlAction::IHave {
2299 topic_hash: topic_hash.clone(),
2300 message_ids: peer_message_ids,
2301 },
2302 );
2303 }
2304 }
2305 }
2306
2307 fn send_graft_prune(
2310 &mut self,
2311 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2312 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2313 no_px: HashSet<PeerId>,
2314 ) {
2315 for (peer, topics) in to_graft.iter() {
2317 for topic in topics {
2318 if let Some((peer_score, ..)) = &mut self.peer_score {
2320 peer_score.graft(peer, topic.clone());
2321 }
2322 }
2323 let mut control_msgs: Vec<GossipsubControlAction> = topics
2324 .iter()
2325 .map(|topic_hash| GossipsubControlAction::Graft {
2326 topic_hash: topic_hash.clone(),
2327 })
2328 .collect();
2329
2330 if let Some(topics) = to_prune.remove(peer) {
2332 let mut prunes = topics
2333 .iter()
2334 .map(|topic_hash| {
2335 self.make_prune(
2336 topic_hash,
2337 peer,
2338 self.config.do_px() && !no_px.contains(peer),
2339 )
2340 })
2341 .collect::<Vec<_>>();
2342 control_msgs.append(&mut prunes);
2343 }
2344
2345 if self
2347 .send_message(
2348 *peer,
2349 GossipsubRpc {
2350 subscriptions: Vec::new(),
2351 messages: Vec::new(),
2352 control_msgs,
2353 }
2354 .into_protobuf(),
2355 )
2356 .is_err()
2357 {
2358 error!("Failed to send control messages. Message too large");
2359 }
2360 }
2361
2362 for (peer, topics) in to_prune.iter() {
2364 let remaining_prunes = topics
2365 .iter()
2366 .map(|topic_hash| {
2367 self.make_prune(
2368 topic_hash,
2369 peer,
2370 self.config.do_px() && !no_px.contains(peer),
2371 )
2372 })
2373 .collect();
2374 if self
2375 .send_message(
2376 *peer,
2377 GossipsubRpc {
2378 subscriptions: Vec::new(),
2379 messages: Vec::new(),
2380 control_msgs: remaining_prunes,
2381 }
2382 .into_protobuf(),
2383 )
2384 .is_err()
2385 {
2386 error!("Failed to send prune messages. Message too large");
2387 }
2388 }
2389 }
2390
2391 fn forward_msg(
2395 &mut self,
2396 msg_id: &MessageId,
2397 message: RawGossipsubMessage,
2398 propagation_source: Option<&PeerId>,
2399 ) -> Result<bool, PublishError> {
2400 if let Some((peer_score, ..)) = &mut self.peer_score {
2402 if let Some(peer) = propagation_source {
2403 peer_score.deliver_message(peer, msg_id, &message.topic);
2404 }
2405 }
2406
2407 debug!("Forwarding message: {:?}", msg_id);
2408 let mut recipient_peers = HashSet::new();
2409
2410 let topic = &message.topic;
2412 if let Some(mesh_peers) = self.mesh.get(&topic) {
2414 for peer_id in mesh_peers {
2415 if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
2416 recipient_peers.insert(*peer_id);
2417 }
2418 }
2419 }
2420
2421 for p in &self.explicit_peers {
2423 if let Some(topics) = self.peer_topics.get(p) {
2424 if Some(p) != propagation_source
2425 && Some(p) != message.source.as_ref()
2426 && topics.contains(&message.topic)
2427 {
2428 recipient_peers.insert(*p);
2429 }
2430 }
2431 }
2432
2433 if !recipient_peers.is_empty() {
2435 let event = Arc::new(
2436 GossipsubRpc {
2437 subscriptions: Vec::new(),
2438 messages: vec![message.clone()],
2439 control_msgs: Vec::new(),
2440 }
2441 .into_protobuf(),
2442 );
2443
2444 for peer in recipient_peers.iter() {
2445 debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
2446 self.send_message(*peer, event.clone())?;
2447 }
2448 debug!("Completed forwarding message");
2449 Ok(true)
2450 } else {
2451 Ok(false)
2452 }
2453 }
2454
2455 pub(crate) fn build_raw_message(
2457 &self,
2458 topic: TopicHash,
2459 data: Vec<u8>,
2460 ) -> Result<RawGossipsubMessage, PublishError> {
2461 match &self.publish_config {
2462 PublishConfig::Signing {
2463 ref keypair,
2464 author,
2465 inline_key,
2466 } => {
2467 let sequence_number: u64 = rand::random();
2469
2470 let signature = {
2471 let message = rpc_proto::Message {
2472 from: Some(author.clone().to_bytes()),
2473 data: Some(data.clone()),
2474 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2475 topic: topic.clone().into_string(),
2476 signature: None,
2477 key: None,
2478 };
2479
2480 let mut buf = Vec::with_capacity(message.encoded_len());
2481 message
2482 .encode(&mut buf)
2483 .expect("Buffer has sufficient capacity");
2484
2485 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2487 signature_bytes.extend_from_slice(&buf);
2488 Some(keypair.sign(&signature_bytes)?)
2489 };
2490
2491 Ok(RawGossipsubMessage {
2492 source: Some(*author),
2493 data,
2494 sequence_number: Some(sequence_number),
2497 topic,
2498 signature,
2499 key: inline_key.clone(),
2500 validated: true, })
2502 }
2503 PublishConfig::Author(peer_id) => {
2504 Ok(RawGossipsubMessage {
2505 source: Some(*peer_id),
2506 data,
2507 sequence_number: Some(rand::random()),
2510 topic,
2511 signature: None,
2512 key: None,
2513 validated: true, })
2515 }
2516 PublishConfig::RandomAuthor => {
2517 Ok(RawGossipsubMessage {
2518 source: Some(PeerId::random()),
2519 data,
2520 sequence_number: Some(rand::random()),
2523 topic,
2524 signature: None,
2525 key: None,
2526 validated: true, })
2528 }
2529 PublishConfig::Anonymous => {
2530 Ok(RawGossipsubMessage {
2531 source: None,
2532 data,
2533 sequence_number: None,
2536 topic,
2537 signature: None,
2538 key: None,
2539 validated: true, })
2541 }
2542 }
2543 }
2544
2545 fn control_pool_add(
2547 control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
2548 peer: PeerId,
2549 control: GossipsubControlAction,
2550 ) {
2551 control_pool
2552 .entry(peer)
2553 .or_insert_with(Vec::new)
2554 .push(control);
2555 }
2556
2557 fn flush_control_pool(&mut self) {
2559 for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
2560 if self
2561 .send_message(
2562 peer,
2563 GossipsubRpc {
2564 subscriptions: Vec::new(),
2565 messages: Vec::new(),
2566 control_msgs: controls,
2567 }
2568 .into_protobuf(),
2569 )
2570 .is_err()
2571 {
2572 error!("Failed to flush control pool. Message too large");
2573 }
2574 }
2575 }
2576
2577 fn send_message(
2580 &mut self,
2581 peer_id: PeerId,
2582 message: impl Into<Arc<rpc_proto::Rpc>>,
2583 ) -> Result<(), PublishError> {
2584 let messages = self.fragment_message(message.into())?;
2589
2590 for message in messages {
2591 self.events
2592 .push_back(NetworkBehaviourAction::NotifyHandler {
2593 peer_id,
2594 event: message,
2595 handler: NotifyHandler::Any,
2596 })
2597 }
2598 Ok(())
2599 }
2600
2601 fn fragment_message(
2604 &self,
2605 rpc: Arc<rpc_proto::Rpc>,
2606 ) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
2607 if rpc.encoded_len() < self.config.max_transmit_size() {
2608 return Ok(vec![rpc]);
2609 }
2610
2611 let new_rpc = rpc_proto::Rpc {
2612 subscriptions: Vec::new(),
2613 publish: Vec::new(),
2614 control: None,
2615 };
2616
2617 let mut rpc_list = vec![new_rpc.clone()];
2618
2619 macro_rules! create_or_add_rpc {
2622 ($object_size: ident ) => {
2623 let list_index = rpc_list.len() - 1; if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
2628 > self.config.max_transmit_size()
2629 && rpc_list[list_index] != new_rpc
2630 {
2631 rpc_list.push(new_rpc.clone());
2633 }
2634 };
2635 };
2636
2637 macro_rules! add_item {
2638 ($object: ident, $type: ident ) => {
2639 let object_size = $object.encoded_len();
2640
2641 if object_size + 2 > self.config.max_transmit_size() {
2642 error!("Individual message too large to fragment");
2645 return Err(PublishError::MessageTooLarge);
2646 }
2647
2648 create_or_add_rpc!(object_size);
2649 rpc_list
2650 .last_mut()
2651 .expect("Must have at least one element")
2652 .$type
2653 .push($object.clone());
2654 };
2655 }
2656
2657 for message in &rpc.publish {
2659 add_item!(message, publish);
2660 }
2661 for subscription in &rpc.subscriptions {
2662 add_item!(subscription, subscriptions);
2663 }
2664
2665 let empty_control = rpc_proto::ControlMessage::default();
2668 if let Some(control) = rpc.control.as_ref() {
2669 if control.encoded_len() + 2 > self.config.max_transmit_size() {
2670 for ihave in &control.ihave {
2672 let len = ihave.encoded_len();
2673 create_or_add_rpc!(len);
2674 rpc_list
2675 .last_mut()
2676 .expect("Always an element")
2677 .control
2678 .get_or_insert_with(|| empty_control.clone())
2679 .ihave
2680 .push(ihave.clone());
2681 }
2682 for iwant in &control.iwant {
2683 let len = iwant.encoded_len();
2684 create_or_add_rpc!(len);
2685 rpc_list
2686 .last_mut()
2687 .expect("Always an element")
2688 .control
2689 .get_or_insert_with(|| empty_control.clone())
2690 .iwant
2691 .push(iwant.clone());
2692 }
2693 for graft in &control.graft {
2694 let len = graft.encoded_len();
2695 create_or_add_rpc!(len);
2696 rpc_list
2697 .last_mut()
2698 .expect("Always an element")
2699 .control
2700 .get_or_insert_with(|| empty_control.clone())
2701 .graft
2702 .push(graft.clone());
2703 }
2704 for prune in &control.prune {
2705 let len = prune.encoded_len();
2706 create_or_add_rpc!(len);
2707 rpc_list
2708 .last_mut()
2709 .expect("Always an element")
2710 .control
2711 .get_or_insert_with(|| empty_control.clone())
2712 .prune
2713 .push(prune.clone());
2714 }
2715 } else {
2716 let len = control.encoded_len();
2717 create_or_add_rpc!(len);
2718 rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
2719 }
2720 }
2721
2722 Ok(rpc_list.into_iter().map(Arc::new).collect())
2723 }
2724}
2725
2726fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2727 addr.iter().find_map(|p| match p {
2728 Ip4(addr) => Some(IpAddr::V4(addr)),
2729 Ip6(addr) => Some(IpAddr::V6(addr)),
2730 _ => None,
2731 })
2732}
2733
2734impl<C, F> NetworkBehaviour for Gossipsub<C, F>
2735where
2736 C: Send + 'static + DataTransform,
2737 F: Send + 'static + TopicSubscriptionFilter,
2738{
2739 type ProtocolsHandler = GossipsubHandler;
2740 type OutEvent = GossipsubEvent;
2741
2742 fn new_handler(&mut self) -> Self::ProtocolsHandler {
2743 GossipsubHandler::new(
2744 self.config.protocol_id_prefix().clone(),
2745 self.config.max_transmit_size(),
2746 self.config.validation_mode().clone(),
2747 self.config.support_floodsub(),
2748 )
2749 }
2750
2751 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
2752 Vec::new()
2753 }
2754
2755 fn inject_connected(&mut self, peer_id: &PeerId) {
2756 if self.blacklisted_peers.contains(peer_id) {
2758 debug!("Ignoring connection from blacklisted peer: {}", peer_id);
2759 return;
2760 }
2761
2762 info!("New peer connected: {}", peer_id);
2763 let mut subscriptions = vec![];
2765 for topic_hash in self.mesh.keys() {
2766 subscriptions.push(GossipsubSubscription {
2767 topic_hash: topic_hash.clone(),
2768 action: GossipsubSubscriptionAction::Subscribe,
2769 });
2770 }
2771
2772 if !subscriptions.is_empty() {
2773 if self
2775 .send_message(
2776 *peer_id,
2777 GossipsubRpc {
2778 messages: Vec::new(),
2779 subscriptions,
2780 control_msgs: Vec::new(),
2781 }
2782 .into_protobuf(),
2783 )
2784 .is_err()
2785 {
2786 error!("Failed to send subscriptions, message too large");
2787 }
2788 }
2789
2790 self.peer_topics.insert(*peer_id, Default::default());
2792
2793 self.peer_protocols
2799 .entry(*peer_id)
2800 .or_insert(PeerKind::Floodsub);
2801
2802 if let Some((peer_score, ..)) = &mut self.peer_score {
2803 peer_score.add_peer(*peer_id);
2804 }
2805 }
2806
2807 fn inject_disconnected(&mut self, peer_id: &PeerId) {
2808 debug!("Peer disconnected: {}", peer_id);
2810 {
2811 let topics = match self.peer_topics.get(peer_id) {
2812 Some(topics) => (topics),
2813 None => {
2814 if !self.blacklisted_peers.contains(peer_id) {
2815 debug!("Disconnected node, not in connected nodes");
2816 }
2817 return;
2818 }
2819 };
2820
2821 for topic in topics {
2823 if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
2825 mesh_peers.remove(peer_id);
2827 }
2828
2829 if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
2831 if !peer_list.remove(peer_id) {
2832 warn!(
2834 "Disconnected node: {} not in topic_peers peer list",
2835 peer_id
2836 );
2837 }
2838 } else {
2839 warn!(
2840 "Disconnected node: {} with topic: {:?} not in topic_peers",
2841 &peer_id, &topic
2842 );
2843 }
2844
2845 self.fanout
2847 .get_mut(&topic)
2848 .map(|peers| peers.remove(peer_id));
2849 }
2850
2851 self.px_peers.remove(peer_id);
2853 self.outbound_peers.remove(peer_id);
2854 }
2855
2856 self.peer_topics.remove(peer_id);
2860 self.peer_protocols.remove(peer_id);
2861
2862 if let Some((peer_score, ..)) = &mut self.peer_score {
2863 peer_score.remove_peer(peer_id);
2864 }
2865 }
2866
2867 fn inject_connection_established(
2868 &mut self,
2869 peer_id: &PeerId,
2870 _: &ConnectionId,
2871 endpoint: &ConnectedPoint,
2872 ) {
2873 if self.blacklisted_peers.contains(peer_id) {
2875 return;
2876 }
2877
2878 if let ConnectedPoint::Dialer { .. } = endpoint {
2880 if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
2886 self.outbound_peers.insert(*peer_id);
2889 }
2890 }
2891
2892 if let Some((peer_score, ..)) = &mut self.peer_score {
2894 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2895 peer_score.add_ip(&peer_id, ip);
2896 } else {
2897 trace!(
2898 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2899 peer_id,
2900 endpoint
2901 )
2902 }
2903 }
2904 }
2905
2906 fn inject_connection_closed(
2907 &mut self,
2908 peer: &PeerId,
2909 _: &ConnectionId,
2910 endpoint: &ConnectedPoint,
2911 ) {
2912 if let Some((peer_score, ..)) = &mut self.peer_score {
2914 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2915 peer_score.remove_ip(peer, &ip);
2916 } else {
2917 trace!(
2918 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2919 peer,
2920 endpoint
2921 )
2922 }
2923 }
2924 }
2925
2926 fn inject_address_change(
2927 &mut self,
2928 peer: &PeerId,
2929 _: &ConnectionId,
2930 endpoint_old: &ConnectedPoint,
2931 endpoint_new: &ConnectedPoint,
2932 ) {
2933 if let Some((peer_score, ..)) = &mut self.peer_score {
2935 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
2936 peer_score.remove_ip(peer, &ip);
2937 } else {
2938 trace!(
2939 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2940 peer,
2941 endpoint_old
2942 )
2943 }
2944 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
2945 peer_score.add_ip(&peer, ip);
2946 } else {
2947 trace!(
2948 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2949 peer,
2950 endpoint_new
2951 )
2952 }
2953 }
2954 }
2955
2956 fn inject_event(
2957 &mut self,
2958 propagation_source: PeerId,
2959 _: ConnectionId,
2960 handler_event: HandlerEvent,
2961 ) {
2962 match handler_event {
2963 HandlerEvent::PeerKind(kind) => {
2964 if let PeerKind::NotSupported = kind {
2966 debug!(
2967 "Peer does not support gossipsub protocols. {}",
2968 propagation_source
2969 );
2970 self.inject_disconnected(&propagation_source);
2972 } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
2973 debug!(
2976 "New peer type found: {} for peer: {}",
2977 kind, propagation_source
2978 );
2979 if let PeerKind::Floodsub = *old_kind {
2980 *old_kind = kind;
2981 }
2982 }
2983 }
2984 HandlerEvent::Message {
2985 rpc,
2986 invalid_messages,
2987 } => {
2988 if !rpc.subscriptions.is_empty() {
2993 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
2994 }
2995
2996 if let (true, _) =
2998 self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
2999 {
3000 debug!("RPC Dropped from greylisted peer {}", propagation_source);
3001 return;
3002 }
3003
3004 if self.peer_score.is_some() {
3006 for (raw_message, validation_error) in invalid_messages {
3007 self.handle_invalid_message(
3008 &propagation_source,
3009 raw_message,
3010 validation_error,
3011 )
3012 }
3013 } else {
3014 for (message, validation_error) in invalid_messages {
3016 warn!(
3017 "Invalid message. Reason: {:?} propagation_peer {} source {:?}",
3018 validation_error,
3019 propagation_source.to_string(),
3020 message.source
3021 );
3022 }
3023 }
3024
3025 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3027 if self.config.max_messages_per_rpc().is_some()
3029 && Some(count) >= self.config.max_messages_per_rpc()
3030 {
3031 warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3032 break;
3033 }
3034 self.handle_received_message(raw_message, &propagation_source);
3035 }
3036
3037 let mut ihave_msgs = vec![];
3040 let mut graft_msgs = vec![];
3041 let mut prune_msgs = vec![];
3042 for control_msg in rpc.control_msgs {
3043 match control_msg {
3044 GossipsubControlAction::IHave {
3045 topic_hash,
3046 message_ids,
3047 } => {
3048 ihave_msgs.push((topic_hash, message_ids));
3049 }
3050 GossipsubControlAction::IWant { message_ids } => {
3051 self.handle_iwant(&propagation_source, message_ids)
3052 }
3053 GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
3054 GossipsubControlAction::Prune {
3055 topic_hash,
3056 peers,
3057 backoff,
3058 } => prune_msgs.push((topic_hash, peers, backoff)),
3059 }
3060 }
3061 if !ihave_msgs.is_empty() {
3062 self.handle_ihave(&propagation_source, ihave_msgs);
3063 }
3064 if !graft_msgs.is_empty() {
3065 self.handle_graft(&propagation_source, graft_msgs);
3066 }
3067 if !prune_msgs.is_empty() {
3068 self.handle_prune(&propagation_source, prune_msgs);
3069 }
3070 }
3071 }
3072 }
3073
3074 fn poll(
3075 &mut self,
3076 cx: &mut Context<'_>,
3077 _: &mut impl PollParameters,
3078 ) -> Poll<
3079 NetworkBehaviourAction<
3080 <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
3081 Self::OutEvent,
3082 >,
3083 > {
3084 if let Some(event) = self.events.pop_front() {
3085 return Poll::Ready(match event {
3086 NetworkBehaviourAction::NotifyHandler {
3087 peer_id,
3088 handler,
3089 event: send_event,
3090 } => {
3091 let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
3093 NetworkBehaviourAction::NotifyHandler {
3094 peer_id,
3095 event,
3096 handler,
3097 }
3098 }
3099 NetworkBehaviourAction::GenerateEvent(e) => {
3100 NetworkBehaviourAction::GenerateEvent(e)
3101 }
3102 NetworkBehaviourAction::DialAddress { address } => {
3103 NetworkBehaviourAction::DialAddress { address }
3104 }
3105 NetworkBehaviourAction::DialPeer { peer_id, condition } => {
3106 NetworkBehaviourAction::DialPeer { peer_id, condition }
3107 }
3108 NetworkBehaviourAction::ReportObservedAddr { address, score } => {
3109 NetworkBehaviourAction::ReportObservedAddr { address, score }
3110 }
3111 });
3112 }
3113
3114 if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
3116 while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
3117 peer_score.refresh_scores();
3118 }
3119 }
3120
3121 while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
3122 self.heartbeat();
3123 }
3124
3125 Poll::Pending
3126 }
3127}
3128
3129fn get_random_peers_dynamic(
3133 topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3134 peer_protocols: &HashMap<PeerId, PeerKind>,
3135 topic_hash: &TopicHash,
3136 n_map: impl Fn(usize) -> usize,
3138 mut f: impl FnMut(&PeerId) -> bool,
3139) -> BTreeSet<PeerId> {
3140 let mut gossip_peers = match topic_peers.get(topic_hash) {
3141 Some(peer_list) => peer_list
3143 .iter()
3144 .cloned()
3145 .filter(|p| {
3146 f(p) && match peer_protocols.get(p) {
3147 Some(PeerKind::Gossipsub) => true,
3148 Some(PeerKind::Gossipsubv1_1) => true,
3149 _ => false,
3150 }
3151 })
3152 .collect(),
3153 None => Vec::new(),
3154 };
3155
3156 let n = n_map(gossip_peers.len());
3158 if gossip_peers.len() <= n {
3159 debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3160 return gossip_peers.into_iter().collect();
3161 }
3162
3163 let mut rng = thread_rng();
3165 gossip_peers.partial_shuffle(&mut rng, n);
3166
3167 debug!("RANDOM PEERS: Got {:?} peers", n);
3168
3169 gossip_peers.into_iter().take(n).collect()
3170}
3171
3172fn get_random_peers(
3175 topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3176 peer_protocols: &HashMap<PeerId, PeerKind>,
3177 topic_hash: &TopicHash,
3178 n: usize,
3179 f: impl FnMut(&PeerId) -> bool,
3180) -> BTreeSet<PeerId> {
3181 get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
3182}
3183
3184fn validate_config(
3187 authenticity: &MessageAuthenticity,
3188 validation_mode: &ValidationMode,
3189) -> Result<(), &'static str> {
3190 match validation_mode {
3191 ValidationMode::Anonymous => {
3192 if authenticity.is_signing() {
3193 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3194 }
3195
3196 if !authenticity.is_anonymous() {
3197 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3198 }
3199 }
3200 ValidationMode::Strict => {
3201 if !authenticity.is_signing() {
3202 return Err(
3203 "Messages will be
3204 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3205 the validation or privacy settings in the config"
3206 );
3207 }
3208 }
3209 _ => {}
3210 }
3211 Ok(())
3212}
3213
3214impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
3215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3216 f.debug_struct("Gossipsub")
3217 .field("config", &self.config)
3218 .field("events", &self.events)
3219 .field("control_pool", &self.control_pool)
3220 .field("publish_config", &self.publish_config)
3221 .field("topic_peers", &self.topic_peers)
3222 .field("peer_topics", &self.peer_topics)
3223 .field("mesh", &self.mesh)
3224 .field("fanout", &self.fanout)
3225 .field("fanout_last_pub", &self.fanout_last_pub)
3226 .field("mcache", &self.mcache)
3227 .field("heartbeat", &self.heartbeat)
3228 .finish()
3229 }
3230}
3231
3232impl fmt::Debug for PublishConfig {
3233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3234 match self {
3235 PublishConfig::Signing { author, .. } => {
3236 f.write_fmt(format_args!("PublishConfig::Signing({})", author))
3237 }
3238 PublishConfig::Author(author) => {
3239 f.write_fmt(format_args!("PublishConfig::Author({})", author))
3240 }
3241 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3242 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3243 }
3244 }
3245}
3246
3247#[cfg(test)]
3248mod local_test {
3249 use super::*;
3250 use crate::IdentTopic;
3251 use asynchronous_codec::Encoder;
3252 use quickcheck::*;
3253 use rand::Rng;
3254
3255 fn empty_rpc() -> GossipsubRpc {
3256 GossipsubRpc {
3257 subscriptions: Vec::new(),
3258 messages: Vec::new(),
3259 control_msgs: Vec::new(),
3260 }
3261 }
3262
3263 fn test_message() -> RawGossipsubMessage {
3264 RawGossipsubMessage {
3265 source: Some(PeerId::random()),
3266 data: vec![0; 100],
3267 sequence_number: None,
3268 topic: TopicHash::from_raw("test_topic"),
3269 signature: None,
3270 key: None,
3271 validated: false,
3272 }
3273 }
3274
3275 fn test_subscription() -> GossipsubSubscription {
3276 GossipsubSubscription {
3277 action: GossipsubSubscriptionAction::Subscribe,
3278 topic_hash: IdentTopic::new("TestTopic").hash(),
3279 }
3280 }
3281
3282 fn test_control() -> GossipsubControlAction {
3283 GossipsubControlAction::IHave {
3284 topic_hash: IdentTopic::new("TestTopic").hash(),
3285 message_ids: vec![MessageId(vec![12u8]); 5],
3286 }
3287 }
3288
3289 impl Arbitrary for GossipsubRpc {
3290 fn arbitrary<G: Gen>(g: &mut G) -> Self {
3291 let mut rpc = empty_rpc();
3292
3293 for _ in 0..g.gen_range(0, 10) {
3294 rpc.subscriptions.push(test_subscription());
3295 }
3296 for _ in 0..g.gen_range(0, 10) {
3297 rpc.messages.push(test_message());
3298 }
3299 for _ in 0..g.gen_range(0, 10) {
3300 rpc.control_msgs.push(test_control());
3301 }
3302 rpc
3303 }
3304 }
3305
3306 #[test]
3307 fn test_message_fragmentation_deterministic() {
3309 let max_transmit_size = 500;
3310 let config = crate::GossipsubConfigBuilder::default()
3311 .max_transmit_size(max_transmit_size)
3312 .validation_mode(ValidationMode::Permissive)
3313 .build()
3314 .unwrap();
3315 let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3316
3317 let mut rpc = empty_rpc();
3319 rpc.messages.push(test_message());
3320
3321 let mut rpc_proto = rpc.clone().into_protobuf();
3322 let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
3323 assert_eq!(
3324 fragmented_messages,
3325 vec![Arc::new(rpc_proto.clone())],
3326 "Messages under the limit shouldn't be fragmented"
3327 );
3328
3329 while rpc_proto.encoded_len() < max_transmit_size {
3332 rpc.messages.push(test_message());
3333 rpc_proto = rpc.clone().into_protobuf();
3334 }
3335
3336 let fragmented_messages = gs
3337 .fragment_message(Arc::new(rpc_proto))
3338 .expect("Should be able to fragment the messages");
3339
3340 assert!(
3341 fragmented_messages.len() > 1,
3342 "the message should be fragmented"
3343 );
3344
3345 for message in fragmented_messages {
3347 assert!(
3348 message.encoded_len() < max_transmit_size,
3349 "all messages should be less than the transmission size"
3350 );
3351 }
3352 }
3353
3354 #[test]
3355 fn test_message_fragmentation() {
3356 fn prop(rpc: GossipsubRpc) {
3357 let max_transmit_size = 500;
3358 let config = crate::GossipsubConfigBuilder::default()
3359 .max_transmit_size(max_transmit_size)
3360 .validation_mode(ValidationMode::Permissive)
3361 .build()
3362 .unwrap();
3363 let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3364
3365 let mut length_codec = unsigned_varint::codec::UviBytes::default();
3366 length_codec.set_max_len(max_transmit_size);
3367 let mut codec =
3368 crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);
3369
3370 let rpc_proto = rpc.into_protobuf();
3371 let fragmented_messages = gs
3372 .fragment_message(Arc::new(rpc_proto.clone()))
3373 .expect("Messages must be valid");
3374
3375 if rpc_proto.encoded_len() < max_transmit_size {
3376 assert_eq!(
3377 fragmented_messages.len(),
3378 1,
3379 "the message should not be fragmented"
3380 );
3381 } else {
3382 assert!(
3383 fragmented_messages.len() > 1,
3384 "the message should be fragmented"
3385 );
3386 }
3387
3388 for message in fragmented_messages {
3390 assert!(
3391 message.encoded_len() < max_transmit_size,
3392 "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
3393 );
3394
3395 let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
3397 codec
3398 .encode(Arc::try_unwrap(message).unwrap(), &mut buf)
3399 .unwrap()
3400 }
3401 }
3402 QuickCheck::new()
3403 .max_tests(100)
3404 .quickcheck(prop as fn(_) -> _)
3405 }
3406}