1use std::{
22 cmp::{max, Ordering},
23 collections::HashSet,
24 collections::VecDeque,
25 collections::{BTreeSet, HashMap},
26 fmt,
27 iter::FromIterator,
28 net::IpAddr,
29 sync::Arc,
30 task::{Context, Poll},
31 time::Duration,
32};
33
34use futures::StreamExt;
35use log::{debug, error, info, trace, warn};
36use prost::Message;
37use rand::{seq::SliceRandom, seq::IteratorRandom, thread_rng, RngCore};
38use wasm_timer::{Instant, Interval};
39
40use mwc_libp2p_core::{
41 connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
42 multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, SimplePushSerializer
43};
44use mwc_libp2p_swarm::{
45 DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
46 ProtocolsHandler,
47};
48
49use crate::backoff::BackoffStorage;
50use crate::config::{GossipsubConfig, ValidationMode};
51use crate::error::{PublishError, SubscriptionError, ValidationError};
52use crate::gossip_promises::GossipPromises;
53use crate::handler::{GossipsubHandler, HandlerEvent};
54use crate::mcache::MessageCache;
55use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
56use crate::protocol::SIGNING_PREFIX;
57use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
58use crate::time_cache::{DuplicateCache, TimeCache};
59use crate::topic::{Hasher, Topic, TopicHash, IdentityHash};
60use crate::transform::{DataTransform, IdentityTransform};
61use crate::types::{
62 FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
63 GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
64};
65use crate::types::{GossipsubRpc, PeerKind};
66use crate::{rpc_proto, TopicScoreParams};
67use std::{cmp::Ordering::Equal, fmt::Debug};
68use std::ops::Add;
69
70#[cfg(test)]
71mod tests;
72
73pub static PEER_TOPIC: &str = "Peers";
75pub const PEER_EXCHANGE_NUMBER_LIMIT : usize = 20;
77
78#[derive(Clone)]
85pub enum MessageAuthenticity {
86 Signed(Keypair),
89 Author(PeerId),
94 RandomAuthor,
99 Anonymous,
109}
110
111impl MessageAuthenticity {
112 pub fn is_signing(&self) -> bool {
114 matches!(self, MessageAuthenticity::Signed(_))
115 }
116
117 pub fn is_anonymous(&self) -> bool {
118 matches!(self, MessageAuthenticity::Anonymous)
119 }
120}
121
122#[derive(Debug)]
124pub enum GossipsubEvent {
125 Message {
127 propagation_source: PeerId,
129 message_id: MessageId,
132 message: GossipsubMessage,
134 },
135 Subscribed {
137 peer_id: PeerId,
139 topic: TopicHash,
141 },
142 Unsubscribed {
144 peer_id: PeerId,
146 topic: TopicHash,
148 },
149}
150
151enum PublishConfig {
154 Signing {
155 keypair: Keypair,
156 author: PeerId,
157 inline_key: Option<Vec<u8>>,
158 },
159 Author(PeerId),
160 RandomAuthor,
161 Anonymous,
162}
163
164impl PublishConfig {
165 pub fn get_own_id(&self) -> Option<&PeerId> {
166 match self {
167 Self::Signing { author, .. } => Some(&author),
168 Self::Author(author) => Some(&author),
169 _ => None,
170 }
171 }
172}
173
174impl From<MessageAuthenticity> for PublishConfig {
175 fn from(authenticity: MessageAuthenticity) -> Self {
176 match authenticity {
177 MessageAuthenticity::Signed(keypair) => {
178 let public_key = keypair.public();
179 let key_enc = public_key.clone().into_protobuf_encoding();
180 let key = if key_enc.len() <= 42 {
181 None
184 } else {
185 Some(key_enc)
187 };
188
189 PublishConfig::Signing {
190 keypair,
191 author: public_key.into_peer_id(),
192 inline_key: key,
193 }
194 }
195 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
196 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
197 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
198 }
199 }
200}
201
202type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
203
204pub struct Gossipsub<
216 D: DataTransform = IdentityTransform,
217 F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
218> {
219 config: GossipsubConfig,
221
222 events: VecDeque<GossipsubNetworkBehaviourAction>,
224
225 control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
227
228 publish_config: PublishConfig,
230
231 duplicate_cache: DuplicateCache<MessageId>,
234
235 peer_protocols: HashMap<PeerId, PeerKind>,
238
239 topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
241
242 peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
244
245 explicit_peers: HashSet<PeerId>,
248
249 blacklisted_peers: HashMap<PeerId, Instant>,
254
255 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
257
258 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
260
261 fanout_last_pub: HashMap<TopicHash, Instant>,
263
264 backoffs: BackoffStorage,
266
267 mcache: MessageCache,
269
270 heartbeat: Interval,
272
273 heartbeat_ticks: u64,
276
277 px_peers: HashSet<PeerId>,
282
283 outbound_peers: HashSet<PeerId>,
286
287 peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
290
291 count_received_ihave: HashMap<PeerId, usize>,
293
294 count_sent_iwant: HashMap<PeerId, usize>,
296
297 published_message_ids: DuplicateCache<MessageId>,
300
301 fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
303
304 subscription_filter: F,
306
307 data_transform: D,
311}
312
313impl<D, F> Gossipsub<D, F>
314where
315 D: DataTransform + Default,
316 F: TopicSubscriptionFilter + Default,
317{
318 pub fn new(
321 privacy: MessageAuthenticity,
322 config: GossipsubConfig,
323 ) -> Result<Self, &'static str> {
324 Self::new_with_subscription_filter_and_transform(
325 privacy,
326 config,
327 F::default(),
328 D::default(),
329 )
330 }
331}
332
333impl<D, F> Gossipsub<D, F>
334where
335 D: DataTransform + Default,
336 F: TopicSubscriptionFilter,
337{
338 pub fn new_with_subscription_filter(
341 privacy: MessageAuthenticity,
342 config: GossipsubConfig,
343 subscription_filter: F,
344 ) -> Result<Self, &'static str> {
345 Self::new_with_subscription_filter_and_transform(
346 privacy,
347 config,
348 subscription_filter,
349 D::default(),
350 )
351 }
352}
353
354impl<D, F> Gossipsub<D, F>
355where
356 D: DataTransform,
357 F: TopicSubscriptionFilter + Default,
358{
359 pub fn new_with_transform(
362 privacy: MessageAuthenticity,
363 config: GossipsubConfig,
364 data_transform: D,
365 ) -> Result<Self, &'static str> {
366 Self::new_with_subscription_filter_and_transform(
367 privacy,
368 config,
369 F::default(),
370 data_transform,
371 )
372 }
373}
374
375impl<D, F> Gossipsub<D, F>
376where
377 D: DataTransform,
378 F: TopicSubscriptionFilter,
379{
380 pub fn new_with_subscription_filter_and_transform(
383 privacy: MessageAuthenticity,
384 config: GossipsubConfig,
385 subscription_filter: F,
386 data_transform: D,
387 ) -> Result<Self, &'static str> {
388 validate_config(&privacy, &config.validation_mode())?;
393
394 Ok(Gossipsub {
397 events: VecDeque::new(),
398 control_pool: HashMap::new(),
399 publish_config: privacy.into(),
400 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
401 fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
402 topic_peers: HashMap::new(),
403 peer_topics: HashMap::new(),
404 explicit_peers: HashSet::new(),
405 blacklisted_peers: HashMap::new(),
406 mesh: HashMap::new(),
407 fanout: HashMap::new(),
408 fanout_last_pub: HashMap::new(),
409 backoffs: BackoffStorage::new(
410 &config.prune_backoff(),
411 config.heartbeat_interval(),
412 config.backoff_slack(),
413 ),
414 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
415 heartbeat: Interval::new_at(
416 Instant::now() + config.heartbeat_initial_delay(),
417 config.heartbeat_interval(),
418 ),
419 heartbeat_ticks: 0,
420 px_peers: HashSet::new(),
421 outbound_peers: HashSet::new(),
422 peer_score: None,
423 count_received_ihave: HashMap::new(),
424 count_sent_iwant: HashMap::new(),
425 peer_protocols: HashMap::new(),
426 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
427 config,
428 subscription_filter,
429 data_transform,
430 })
431 }
432}
433
434impl<D, F> Gossipsub<D, F>
435where
436 D: DataTransform,
437 F: TopicSubscriptionFilter,
438{
439 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
441 self.mesh.keys()
442 }
443
444 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
446 self.mesh
447 .get(topic_hash)
448 .into_iter()
449 .map(|x| x.iter())
450 .flatten()
451 }
452
453 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
455 let mut res = BTreeSet::new();
456 for peers in self.mesh.values() {
457 res.extend(peers);
458 }
459 res.into_iter()
460 }
461
462 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
464 self.peer_topics
465 .iter()
466 .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
467 }
468
469 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
471 self.peer_protocols.iter()
472 }
473
474 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
476 self.peer_score
477 .as_ref()
478 .map(|(score, ..)| score.score(peer_id))
479 }
480
481 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
486 debug!("Subscribing to topic: {}", topic);
487 let topic_hash = topic.hash();
488 if !self.subscription_filter.can_subscribe(&topic_hash) {
489 return Err(SubscriptionError::NotAllowed);
490 }
491
492 if self.mesh.get(&topic_hash).is_some() {
493 debug!("Topic: {} is already in the mesh.", topic);
494 return Ok(false);
495 }
496
497 let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
499 if !peer_list.is_empty() {
500 let event = Arc::new(
501 GossipsubRpc {
502 messages: Vec::new(),
503 subscriptions: vec![GossipsubSubscription {
504 topic_hash: topic_hash.clone(),
505 action: GossipsubSubscriptionAction::Subscribe,
506 }],
507 control_msgs: Vec::new(),
508 }
509 .into_protobuf(),
510 );
511
512 for peer in peer_list {
513 debug!("Sending SUBSCRIBE to peer: {:?}", peer);
514 self.send_message(peer, event.clone())
515 .map_err(SubscriptionError::PublishError)?;
516 }
517 }
518
519 self.join(&topic_hash);
522 info!("Subscribed to topic: {}", topic);
523 Ok(true)
524 }
525
526 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
530 debug!("Unsubscribing from topic: {}", topic);
531 let topic_hash = topic.hash();
532
533 if self.mesh.get(&topic_hash).is_none() {
534 debug!("Already unsubscribed from topic: {:?}", topic_hash);
535 return Ok(false);
537 }
538
539 let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
541 if !peer_list.is_empty() {
542 let event = Arc::new(
543 GossipsubRpc {
544 messages: Vec::new(),
545 subscriptions: vec![GossipsubSubscription {
546 topic_hash: topic_hash.clone(),
547 action: GossipsubSubscriptionAction::Unsubscribe,
548 }],
549 control_msgs: Vec::new(),
550 }
551 .into_protobuf(),
552 );
553
554 for peer in peer_list {
555 debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
556 self.send_message(peer, event.clone())?;
557 }
558 }
559
560 self.leave(&topic_hash);
563
564 info!("Unsubscribed from topic: {:?}", topic_hash);
565 Ok(true)
566 }
567
568 pub fn publish<H: Hasher>(
570 &mut self,
571 topic: Topic<H>,
572 data: impl Into<Vec<u8>>,
573 ) -> Result<MessageId, PublishError> {
574 let data = data.into();
575
576 let transformed_data = self
578 .data_transform
579 .outbound_transform(&topic.hash(), data.clone())?;
580
581 let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
582
583 let msg_id = self.config.message_id(&GossipsubMessage {
585 source: raw_message.source,
586 data: data.clone(), sequence_number: raw_message.sequence_number,
588 topic: raw_message.topic.clone(),
589 });
590
591 let event = Arc::new(
592 GossipsubRpc {
593 subscriptions: Vec::new(),
594 messages: vec![raw_message.clone()],
595 control_msgs: Vec::new(),
596 }
597 .into_protobuf(),
598 );
599
600 if event.encoded_len() > self.config.max_transmit_size() {
602 return Err(PublishError::MessageTooLarge);
603 }
604
605 if self.duplicate_cache.contains(&msg_id) {
607 warn!(
610 "Not publishing a message that has already been published. Msg-id {}",
611 msg_id
612 );
613 return Err(PublishError::Duplicate);
614 }
615
616 debug!("Publishing message: Message_id: {:?} Content: {}", msg_id, String::from_utf8_lossy(&data) );
617
618 let topic_hash = raw_message.topic.clone();
619
620 let mesh_peers_sent =
622 !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
623
624 let mut recipient_peers = HashSet::new();
625 if let Some(set) = self.topic_peers.get(&topic_hash) {
626 if self.config.flood_publish() {
627 recipient_peers.extend(
629 set.iter()
630 .filter(|p| {
631 self.explicit_peers.contains(*p)
632 || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
633 })
634 .cloned(),
635 );
636 } else {
637 for peer in &self.explicit_peers {
639 if set.contains(peer) {
640 recipient_peers.insert(*peer);
641 }
642 }
643
644 for (peer, kind) in &self.peer_protocols {
646 if kind == &PeerKind::Floodsub
647 && !self
648 .score_below_threshold(peer, |ts| ts.publish_threshold)
649 .0
650 {
651 recipient_peers.insert(*peer);
652 }
653 }
654
655 if self.mesh.get(&topic_hash).is_none() {
657 debug!("Topic: {:?} not in the mesh", topic_hash);
658 if self.fanout.contains_key(&topic_hash) {
660 for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
661 recipient_peers.insert(*peer);
662 }
663 } else {
664 let mesh_n = self.config.mesh_n();
666 let new_peers = get_random_peers(
667 &self.topic_peers,
668 &self.peer_protocols,
669 &topic_hash,
670 mesh_n,
671 {
672 |p| {
673 !self.explicit_peers.contains(p)
674 && !self
675 .score_below_threshold(p, |pst| pst.publish_threshold)
676 .0
677 }
678 },
679 );
680 self.fanout.insert(topic_hash.clone(), new_peers.clone());
682 for peer in new_peers {
683 debug!("Peer added to fanout: {:?}", peer);
684 recipient_peers.insert(peer);
685 }
686 }
687 self.fanout_last_pub
689 .insert(topic_hash.clone(), Instant::now());
690 }
691 }
692 }
693
694 if recipient_peers.is_empty() {
696 let mut rng = thread_rng();
697 recipient_peers.extend(
698 self.peer_topics.iter().map(|(k,_v)| k.clone())
699 .choose_multiple(&mut rng, self.config.mesh_n() )
700 .iter().cloned()
701 );
702 }
703
704 if recipient_peers.is_empty() && !mesh_peers_sent {
705 return Err(PublishError::InsufficientPeers);
706 }
707
708 self.duplicate_cache.insert(msg_id.clone());
711 self.mcache.put(&msg_id, raw_message);
712
713 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
716 if !self.config.allow_self_origin() {
717 self.published_message_ids.insert(msg_id.clone());
718 }
719 }
720
721 for peer_id in recipient_peers.iter() {
723 debug!("Sending message to peer: {:?}", peer_id);
724 self.send_message(*peer_id, event.clone())?;
725 }
726
727 info!("Published message: {:?}", &msg_id);
728 Ok(msg_id)
729 }
730
731 pub fn report_message_validation_result(
751 &mut self,
752 msg_id: &MessageId,
753 propagation_source: &PeerId,
754 acceptance: MessageAcceptance,
755 ) -> Result<bool, PublishError> {
756 let reject_reason = match acceptance {
757 MessageAcceptance::Accept => {
758 let raw_message = match self.mcache.validate(msg_id) {
759 Some(raw_message) => raw_message.clone(),
760 None => {
761 warn!(
762 "Message not in cache. Ignoring forwarding. Message Id: {}",
763 msg_id
764 );
765 return Ok(false);
766 }
767 };
768 self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
769 return Ok(true);
770 }
771 MessageAcceptance::Reject => {
772 self.disconnect_peer(propagation_source.clone(), true );
773 RejectReason::ValidationFailed
774 },
775 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
776 };
777
778 if let Some(raw_message) = self.mcache.remove(msg_id) {
779 if let Some((peer_score, ..)) = &mut self.peer_score {
781 peer_score.reject_message(
782 propagation_source,
783 msg_id,
784 &raw_message.topic,
785 reject_reason,
786 );
787 }
788 Ok(true)
789 } else {
790 warn!("Rejected message not in cache. Message Id: {}", msg_id);
791 Ok(false)
792 }
793 }
794
795 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
797 debug!("Adding explicit peer {}", peer_id);
798
799 self.explicit_peers.insert(*peer_id);
800
801 self.check_explicit_peer_connection(peer_id);
802 }
803
804 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
807 debug!("Removing explicit peer {}", peer_id);
808 self.explicit_peers.remove(peer_id);
809 }
810
811 pub fn blacklist_peer(&mut self, peer_id: &PeerId, duration: Duration ) {
814 if self.blacklisted_peers.insert(*peer_id, Instant::now().add(duration) ).is_none() {
815 debug!("Peer has been blacklisted: {}", peer_id);
816 }
817 }
818
819 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
821 if self.blacklisted_peers.remove(peer_id).is_some() {
822 debug!("Peer has been removed from the blacklist: {}", peer_id);
823 }
824 }
825
826 pub fn with_peer_score(
830 &mut self,
831 params: PeerScoreParams,
832 threshold: PeerScoreThresholds,
833 ) -> Result<(), String> {
834 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
835 }
836
837 pub fn with_peer_score_and_message_delivery_time_callback(
840 &mut self,
841 params: PeerScoreParams,
842 threshold: PeerScoreThresholds,
843 callback: Option<fn(&PeerId, &TopicHash, f64)>,
844 ) -> Result<(), String> {
845 params.validate()?;
846 threshold.validate()?;
847
848 if self.peer_score.is_some() {
849 return Err("Peer score set twice".into());
850 }
851
852 let interval = Interval::new(params.decay_interval);
853 let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
854 self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
855 Ok(())
856 }
857
858 pub fn set_topic_params<H: Hasher>(
862 &mut self,
863 topic: Topic<H>,
864 params: TopicScoreParams,
865 ) -> Result<(), &'static str> {
866 if let Some((peer_score, ..)) = &mut self.peer_score {
867 peer_score.set_topic_params(topic.hash(), params);
868 Ok(())
869 } else {
870 Err("Peer score must be initialised with `with_peer_score()`")
871 }
872 }
873
874 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
877 if let Some((peer_score, ..)) = &mut self.peer_score {
878 peer_score.set_application_score(peer_id, new_score)
879 } else {
880 false
881 }
882 }
883
884 fn join(&mut self, topic_hash: &TopicHash) {
886 debug!("Running JOIN for topic: {:?}", topic_hash);
887
888 if self.mesh.contains_key(topic_hash) {
890 info!("JOIN: The topic is already in the mesh, ignoring JOIN");
891 return;
892 }
893
894 let mut added_peers = HashSet::new();
895
896 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
899 debug!(
900 "JOIN: Removing peers from the fanout for topic: {:?}",
901 topic_hash
902 );
903
904 peers = peers
906 .into_iter()
907 .filter(|p| {
908 !self.explicit_peers.contains(p)
909 && !self.score_below_threshold(p, |_| 0.0).0
910 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
911 })
912 .collect();
913
914 let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
917 debug!(
918 "JOIN: Adding {:?} peers from the fanout for topic: {:?}",
919 add_peers, topic_hash
920 );
921 added_peers.extend(peers.iter().cloned().take(add_peers));
922 self.mesh.insert(
923 topic_hash.clone(),
924 peers.into_iter().take(add_peers).collect(),
925 );
926 self.fanout_last_pub.remove(topic_hash);
928 }
929
930 if added_peers.len() < self.config.mesh_n() {
932 let new_peers = get_random_peers(
934 &self.topic_peers,
935 &self.peer_protocols,
936 topic_hash,
937 self.config.mesh_n() - added_peers.len(),
938 |peer| {
939 !added_peers.contains(peer)
940 && !self.explicit_peers.contains(peer)
941 && !self.score_below_threshold(peer, |_| 0.0).0
942 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
943 },
944 );
945 added_peers.extend(new_peers.clone());
946 debug!(
948 "JOIN: Inserting {:?} random peers into the mesh",
949 new_peers.len()
950 );
951 let mesh_peers = self
952 .mesh
953 .entry(topic_hash.clone())
954 .or_insert_with(Default::default);
955 mesh_peers.extend(new_peers);
956 }
957
958 for peer_id in added_peers {
959 info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
961 if let Some((peer_score, ..)) = &mut self.peer_score {
962 peer_score.graft(&peer_id, topic_hash.clone());
963 }
964 Self::control_pool_add(
965 &mut self.control_pool,
966 peer_id,
967 GossipsubControlAction::Graft {
968 topic_hash: topic_hash.clone(),
969 },
970 );
971 }
972 debug!("Completed JOIN for topic: {:?}", topic_hash);
973 }
974
975 fn make_prune(
977 &mut self,
978 topic_hash: &TopicHash,
979 peer: &PeerId,
980 do_px: bool,
981 ) -> GossipsubControlAction {
982 if let Some((peer_score, ..)) = &mut self.peer_score {
983 peer_score.prune(peer, topic_hash.clone());
984 }
985
986 match self.peer_protocols.get(peer) {
987 Some(PeerKind::Floodsub) => {
988 error!("Attempted to prune a Floodsub peer");
989 }
990 Some(PeerKind::Gossipsub) => {
991 return GossipsubControlAction::Prune {
993 topic_hash: topic_hash.clone(),
994 peers: Vec::new(),
995 backoff: None,
996 };
997 }
998 None => {
999 error!("Attempted to Prune an unknown peer");
1000 }
1001 _ => {} }
1003
1004 let peers = if do_px {
1006 get_random_peers(
1007 &self.topic_peers,
1008 &self.peer_protocols,
1009 &topic_hash,
1010 self.config.prune_peers(),
1011 |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1012 )
1013 .into_iter()
1014 .map(|p| PeerInfo { peer_id: Some(p) })
1015 .collect()
1016 } else {
1017 Vec::new()
1018 };
1019
1020 self.backoffs
1022 .update_backoff(topic_hash, peer, self.config.prune_backoff());
1023
1024 GossipsubControlAction::Prune {
1025 topic_hash: topic_hash.clone(),
1026 peers,
1027 backoff: Some(self.config.prune_backoff().as_secs()),
1028 }
1029 }
1030
1031 fn leave(&mut self, topic_hash: &TopicHash) {
1033 debug!("Running LEAVE for topic {:?}", topic_hash);
1034
1035 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1037 for peer in peers {
1038 info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
1040 let control = self.make_prune(topic_hash, &peer, self.config.do_px());
1041 Self::control_pool_add(&mut self.control_pool, peer, control);
1042 }
1043 }
1044 debug!("Completed LEAVE for topic: {:?}", topic_hash);
1045 }
1046
1047 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1049 if !self.peer_topics.contains_key(peer_id) {
1050 debug!("Connecting to explicit peer {:?}", peer_id);
1052 self.events.push_back(NetworkBehaviourAction::DialPeer {
1053 peer_id: *peer_id,
1054 condition: DialPeerCondition::Disconnected,
1055 });
1056 }
1057 }
1058
1059 fn score_below_threshold(
1062 &self,
1063 peer_id: &PeerId,
1064 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1065 ) -> (bool, f64) {
1066 Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1067 }
1068
1069 fn score_below_threshold_from_scores(
1070 peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
1071 peer_id: &PeerId,
1072 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1073 ) -> (bool, f64) {
1074 if let Some((peer_score, thresholds, ..)) = peer_score {
1075 let score = peer_score.score(peer_id);
1076 if score < threshold(thresholds) {
1077 return (true, score);
1078 }
1079 (false, score)
1080 } else {
1081 (false, 0.0)
1082 }
1083 }
1084
1085 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1088 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1090 debug!(
1091 "IHAVE: ignoring peer {:?} with score below threshold [score = {}]",
1092 peer_id, score
1093 );
1094 return;
1095 }
1096
1097 let peer_have = self
1099 .count_received_ihave
1100 .entry(*peer_id)
1101 .or_insert(0);
1102 *peer_have += 1;
1103 if *peer_have > self.config.max_ihave_messages() {
1104 debug!(
1105 "IHAVE: peer {} has advertised too many times ({}) within this heartbeat \
1106 interval; ignoring",
1107 peer_id, *peer_have
1108 );
1109 return;
1110 }
1111
1112 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1113 if *iasked >= self.config.max_ihave_length() {
1114 debug!(
1115 "IHAVE: peer {} has already advertised too many messages ({}); ignoring",
1116 peer_id, *iasked
1117 );
1118 return;
1119 }
1120 }
1121
1122 debug!("Handling IHAVE for peer: {:?}", peer_id);
1123
1124 let mut iwant_ids = HashSet::new();
1126
1127 for (topic, ids) in ihave_msgs {
1128 if !self.mesh.contains_key(&topic) {
1130 debug!(
1131 "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
1132 topic
1133 );
1134 continue;
1135 }
1136
1137 for id in ids {
1138 if !self.duplicate_cache.contains(&id) {
1139 iwant_ids.insert(id);
1141 }
1142 }
1143 }
1144
1145 if !iwant_ids.is_empty() {
1146 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1147 let mut iask = iwant_ids.len();
1148 if *iasked + iask > self.config.max_ihave_length() {
1149 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1150 }
1151
1152 debug!(
1154 "IHAVE: Asking for {} out of {} messages from {}",
1155 iask,
1156 iwant_ids.len(),
1157 peer_id
1158 );
1159
1160 let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
1162 let mut rng = thread_rng();
1163 iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
1164
1165 iwant_ids_vec.truncate(iask as usize);
1166 *iasked += iask;
1167
1168 let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
1169 if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1170 gossip_promises.add_promise(
1171 *peer_id,
1172 &message_ids,
1173 Instant::now() + self.config.iwant_followup_time(),
1174 );
1175 }
1176 debug!(
1177 "IHAVE: Asking for the following messages from {}: {:?}",
1178 peer_id, message_ids
1179 );
1180
1181 Self::control_pool_add(
1182 &mut self.control_pool,
1183 *peer_id,
1184 GossipsubControlAction::IWant { message_ids },
1185 );
1186 }
1187 debug!("Completed IHAVE handling for peer: {:?}", peer_id);
1188 }
1189
1190 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1193 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1195 debug!(
1196 "IWANT: ignoring peer {:?} with score below threshold [score = {}]",
1197 peer_id, score
1198 );
1199 return;
1200 }
1201
1202 debug!("Handling IWANT for peer: {:?}", peer_id);
1203 let mut cached_messages = HashMap::new();
1205
1206 for id in iwant_msgs {
1207 if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
1210 if count > self.config.gossip_retransimission() {
1211 debug!(
1212 "IWANT: Peer {} has asked for message {} too many times; ignoring \
1213 request",
1214 peer_id, &id
1215 );
1216 } else {
1217 cached_messages.insert(id.clone(), msg.clone());
1218 }
1219 }
1220 }
1221
1222 if !cached_messages.is_empty() {
1223 debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
1224 let message_list = cached_messages
1226 .into_iter()
1227 .map(|entry| entry.1)
1228 .collect();
1229 if self
1230 .send_message(
1231 *peer_id,
1232 GossipsubRpc {
1233 subscriptions: Vec::new(),
1234 messages: message_list,
1235 control_msgs: Vec::new(),
1236 }
1237 .into_protobuf(),
1238 )
1239 .is_err()
1240 {
1241 error!("Failed to send cached messages. Messages too large");
1242 }
1243 }
1244 debug!("Completed IWANT handling for peer: {}", peer_id);
1245 }
1246
1247 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1250 debug!("Handling GRAFT message for peer: {}", peer_id);
1251
1252 let mut to_prune_topics = HashSet::new();
1253
1254 let mut do_px = self.config.do_px();
1255
1256 if self.explicit_peers.contains(peer_id) {
1258 warn!("GRAFT: ignoring request from direct peer {}", peer_id);
1259 to_prune_topics = HashSet::from_iter(topics.into_iter());
1261 do_px = false
1263 } else {
1264 let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1265 let now = Instant::now();
1266 for topic_hash in topics {
1267 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1268 if peers.contains(peer_id) {
1270 debug!(
1271 "GRAFT: Received graft for peer {:?} that is already in topic {:?}",
1272 peer_id, &topic_hash
1273 );
1274 continue;
1275 }
1276
1277 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1279 {
1280 if backoff_time > now {
1281 warn!(
1282 "GRAFT: peer attempted graft within backoff time, penalizing {}",
1283 peer_id
1284 );
1285 if let Some((peer_score, ..)) = &mut self.peer_score {
1287 peer_score.add_penalty(peer_id, 1);
1288
1289 let flood_cutoff = (backoff_time
1291 + self.config.graft_flood_threshold())
1292 - self.config.prune_backoff();
1293 if flood_cutoff > now {
1294 peer_score.add_penalty(peer_id, 1);
1296 }
1297 }
1298 do_px = false;
1300
1301 to_prune_topics.insert(topic_hash.clone());
1302 continue;
1303 }
1304 }
1305
1306 if below_zero {
1308 debug!(
1310 "GRAFT: ignoring peer {:?} with negative score [score = {}, \
1311 topic = {}]",
1312 peer_id, score, topic_hash
1313 );
1314 to_prune_topics.insert(topic_hash.clone());
1316 do_px = false;
1318 continue;
1319 }
1320
1321 if peers.len() >= self.config.mesh_n_high()
1324 && !self.outbound_peers.contains(peer_id)
1325 {
1326 to_prune_topics.insert(topic_hash.clone());
1327 continue;
1328 }
1329
1330 info!(
1332 "GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
1333 peer_id, &topic_hash
1334 );
1335 peers.insert(*peer_id);
1336
1337 if let Some((peer_score, ..)) = &mut self.peer_score {
1338 peer_score.graft(peer_id, topic_hash);
1339 }
1340 } else {
1341 do_px = false;
1343 debug!(
1344 "GRAFT: Received graft for unknown topic {:?} from peer {:?}",
1345 &topic_hash, peer_id
1346 );
1347 continue;
1349 }
1350 }
1351 }
1352
1353 if !to_prune_topics.is_empty() {
1354 let prune_messages = to_prune_topics
1356 .iter()
1357 .map(|t| self.make_prune(t, peer_id, do_px))
1358 .collect();
1359 info!(
1361 "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {}",
1362 peer_id
1363 );
1364
1365 if self
1366 .send_message(
1367 *peer_id,
1368 GossipsubRpc {
1369 subscriptions: Vec::new(),
1370 messages: Vec::new(),
1371 control_msgs: prune_messages,
1372 }
1373 .into_protobuf(),
1374 )
1375 .is_err()
1376 {
1377 error!("Failed to send graft. Message too large");
1378 }
1379 }
1380 debug!("Completed GRAFT handling for peer: {}", peer_id);
1381 }
1382
1383 fn remove_peer_from_mesh(
1384 &mut self,
1385 peer_id: &PeerId,
1386 topic_hash: &TopicHash,
1387 backoff: Option<u64>,
1388 always_update_backoff: bool,
1389 ) {
1390 let mut update_backoff = always_update_backoff;
1391 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1392 if peers.remove(peer_id) {
1394 info!(
1395 "PRUNE: Removing peer: {} from the mesh for topic: {}",
1396 peer_id.to_string(),
1397 topic_hash
1398 );
1399
1400 if let Some((peer_score, ..)) = &mut self.peer_score {
1401 peer_score.prune(peer_id, topic_hash.clone());
1402 }
1403
1404 update_backoff = true;
1405 }
1406 }
1407 if update_backoff {
1408 let time = if let Some(backoff) = backoff {
1409 Duration::from_secs(backoff)
1410 } else {
1411 self.config.prune_backoff()
1412 };
1413 self.backoffs.update_backoff(&topic_hash, peer_id, time);
1415 }
1416 }
1417
1418 fn handle_prune(
1420 &mut self,
1421 peer_id: &PeerId,
1422 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1423 ) {
1424 debug!("Handling PRUNE message for peer: {}", peer_id);
1425 let (below_threshold, score) =
1426 self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1427 for (topic_hash, px, backoff) in prune_data {
1428 self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
1429
1430 if self.mesh.contains_key(&topic_hash) {
1431 if !px.is_empty() {
1433 if below_threshold {
1435 debug!(
1436 "PRUNE: ignoring PX from peer {:?} with insufficient score \
1437 [score ={} topic = {}]",
1438 peer_id, score, topic_hash
1439 );
1440 continue;
1441 }
1442
1443 if self.config.prune_peers() > 0 {
1450 self.px_connect(px);
1451 }
1452 }
1453 }
1454 }
1455 debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
1456 }
1457
1458 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1459 let n = self.config.prune_peers();
1460 px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
1465 if px.len() > n {
1466 let mut rng = thread_rng();
1468 px.partial_shuffle(&mut rng, n);
1469 px = px.into_iter().take(n).collect();
1470 }
1471
1472 for p in px {
1473 if let Some(peer_id) = p.peer_id {
1476 self.px_peers.insert(peer_id);
1478
1479 self.events.push_back(NetworkBehaviourAction::DialPeer {
1481 peer_id,
1482 condition: DialPeerCondition::Disconnected,
1483 });
1484 }
1485 }
1486 }
1487
1488 fn message_is_valid(
1491 &mut self,
1492 msg_id: &MessageId,
1493 raw_message: &mut RawGossipsubMessage,
1494 propagation_source: &PeerId,
1495 ) -> bool {
1496 debug!(
1497 "Handling message: {:?} from peer: {}",
1498 msg_id,
1499 propagation_source.to_string(),
1500 );
1501
1502 if self.blacklisted_peers.contains_key(propagation_source) {
1504 debug!(
1505 "Rejecting message from blacklisted peer: {}",
1506 propagation_source
1507 );
1508 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1509 peer_score.reject_message(
1510 propagation_source,
1511 msg_id,
1512 &raw_message.topic,
1513 RejectReason::BlackListedPeer,
1514 );
1515 gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1516 }
1517 self.disconnect_peer(*propagation_source, true);
1519 return false;
1520 }
1521
1522 if let Some(source) = raw_message.source.as_ref() {
1524 if self.blacklisted_peers.contains_key(source) {
1525 debug!(
1526 "Rejecting message from peer {} because of blacklisted source: {}",
1527 propagation_source, source
1528 );
1529 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1530 peer_score.reject_message(
1531 propagation_source,
1532 msg_id,
1533 &raw_message.topic,
1534 RejectReason::BlackListedSource,
1535 );
1536 gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
1537 }
1538 return false;
1539 }
1540 }
1541
1542 if !self.config.validate_messages() {
1546 raw_message.validated = true;
1547 }
1548
1549 let self_published = !self.config.allow_self_origin()
1551 && if let Some(own_id) = self.publish_config.get_own_id() {
1552 own_id != propagation_source
1553 && raw_message.source.as_ref().map_or(false, |s| s == own_id)
1554 } else {
1555 self.published_message_ids.contains(&msg_id)
1556 };
1557
1558 if self_published {
1559 debug!(
1560 "Dropping message {} claiming to be from self but forwarded from {}",
1561 msg_id, propagation_source
1562 );
1563 if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
1564 peer_score.reject_message(
1565 propagation_source,
1566 msg_id,
1567 &raw_message.topic,
1568 RejectReason::SelfOrigin,
1569 );
1570 gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
1571 }
1572 return false;
1573 }
1574
1575 true
1576 }
1577
1578 fn handle_received_message(
1582 &mut self,
1583 mut raw_message: RawGossipsubMessage,
1584 propagation_source: &PeerId,
1585 ) {
1586 let fast_message_id = self.config.fast_message_id(&raw_message);
1587 if let Some(fast_message_id) = fast_message_id.as_ref() {
1588 if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
1589 let msg_id = msg_id.clone();
1590 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1591 self.handle_invalid_message(
1592 propagation_source,
1593 raw_message,
1594 ValidationError::TransformFailed,
1595 );
1596 return;
1597 }
1598 if let Some((peer_score, ..)) = &mut self.peer_score {
1599 peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
1600 }
1601 return;
1602 }
1603 }
1604
1605 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1607 Ok(message) => message,
1608 Err(e) => {
1609 debug!("Invalid message. Transform error: {:?}", e);
1610 self.handle_invalid_message(
1612 propagation_source,
1613 raw_message,
1614 ValidationError::TransformFailed,
1615 );
1616 return;
1617 }
1618 };
1619
1620 let msg_id = self.config.message_id(&message);
1622
1623 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1627 return;
1628 }
1629
1630 if let Some(fast_message_id) = fast_message_id {
1632 self.fast_messsage_id_cache
1634 .entry(fast_message_id)
1635 .or_insert_with(|| msg_id.clone());
1636 }
1637 if !self.duplicate_cache.insert(msg_id.clone()) {
1638 debug!(
1639 "Message already received, ignoring. Message: {}",
1640 msg_id
1641 );
1642 if let Some((peer_score, ..)) = &mut self.peer_score {
1643 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1644 }
1645 return;
1646 }
1647 debug!(
1648 "Put message {:?} in duplicate_cache and resolve promises",
1649 msg_id
1650 );
1651
1652 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1655 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1656 gossip_promises.message_delivered(&msg_id);
1657 }
1658
1659 self.mcache.put(&msg_id, raw_message.clone());
1661
1662 if self.config.validate_messages() || self.mesh.contains_key(&message.topic) {
1665 debug!("Sending received message to user");
1666 self.events.push_back(NetworkBehaviourAction::GenerateEvent(
1667 GossipsubEvent::Message {
1668 propagation_source: *propagation_source,
1669 message_id: msg_id.clone(),
1670 message,
1671 },
1672 ));
1673 } else {
1674 debug!(
1675 "Received message on a topic we are not subscribed to: {:?}",
1676 message.topic
1677 );
1678 return;
1679 }
1680
1681 if !self.config.validate_messages() {
1683 if self
1684 .forward_msg(&msg_id, raw_message, Some(propagation_source))
1685 .is_err()
1686 {
1687 error!("Failed to forward message. Too large");
1688 }
1689 debug!("Completed message handling for message: {:?}", msg_id);
1690 }
1691 }
1692
1693 fn handle_invalid_message(
1695 &mut self,
1696 propagation_source: &PeerId,
1697 raw_message: RawGossipsubMessage,
1698 validation_error: ValidationError,
1699 ) {
1700 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1701 let reason = RejectReason::ValidationError(validation_error);
1702 let fast_message_id_cache = &self.fast_messsage_id_cache;
1703 if let Some(msg_id) = self
1704 .config
1705 .fast_message_id(&raw_message)
1706 .and_then(|id| fast_message_id_cache.get(&id))
1707 {
1708 peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
1709 gossip_promises.reject_message(msg_id, &reason);
1710 } else {
1711 peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1715 }
1716 }
1717 }
1718
1719 fn handle_received_subscriptions(
1721 &mut self,
1722 subscriptions: &[GossipsubSubscription],
1723 propagation_source: &PeerId,
1724 ) {
1725 debug!(
1726 "Handling subscriptions: {:?}, from source: {}",
1727 subscriptions,
1728 propagation_source.to_string()
1729 );
1730
1731 let mut unsubscribed_peers = Vec::new();
1732
1733 let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
1734 Some(topics) => topics,
1735 None => {
1736 error!(
1737 "Subscription by unknown peer: {}",
1738 propagation_source.to_string()
1739 );
1740 return;
1741 }
1742 };
1743
1744 let mut grafts = Vec::new();
1746
1747 let mut application_event = Vec::new();
1749
1750 let filtered_topics = match self
1751 .subscription_filter
1752 .filter_incoming_subscriptions(subscriptions, subscribed_topics)
1753 {
1754 Ok(topics) => topics,
1755 Err(s) => {
1756 error!(
1757 "Subscription filter error: {}; ignoring RPC from peer {}",
1758 s,
1759 propagation_source.to_string()
1760 );
1761 return;
1762 }
1763 };
1764
1765 for subscription in filtered_topics {
1766 let peer_list = self
1768 .topic_peers
1769 .entry(subscription.topic_hash.clone())
1770 .or_insert_with(Default::default);
1771
1772 match subscription.action {
1773 GossipsubSubscriptionAction::Subscribe => {
1774 if peer_list.insert(*propagation_source) {
1775 debug!(
1776 "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
1777 propagation_source.to_string(),
1778 subscription.topic_hash
1779 );
1780 }
1781
1782 subscribed_topics.insert(subscription.topic_hash.clone());
1784
1785 if !self.explicit_peers.contains(propagation_source)
1787 && match self.peer_protocols.get(propagation_source) {
1788 Some(PeerKind::Gossipsubv1_1) => true,
1789 Some(PeerKind::Gossipsub) => true,
1790 _ => false,
1791 }
1792 && !Self::score_below_threshold_from_scores(
1793 &self.peer_score,
1794 propagation_source,
1795 |_| 0.0,
1796 )
1797 .0
1798 && !self
1799 .backoffs
1800 .is_backoff_with_slack(&subscription.topic_hash, propagation_source)
1801 {
1802 if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
1803 if peers.len() < self.config.mesh_n_low()
1804 && peers.insert(*propagation_source)
1805 {
1806 debug!(
1807 "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
1808 propagation_source.to_string(),
1809 subscription.topic_hash
1810 );
1811 debug!(
1813 "Sending GRAFT to peer {} for topic {:?}",
1814 propagation_source.to_string(),
1815 subscription.topic_hash
1816 );
1817 if let Some((peer_score, ..)) = &mut self.peer_score {
1818 peer_score
1819 .graft(propagation_source, subscription.topic_hash.clone());
1820 }
1821 grafts.push(GossipsubControlAction::Graft {
1822 topic_hash: subscription.topic_hash.clone(),
1823 });
1824 }
1825 }
1826 }
1827 application_event.push(NetworkBehaviourAction::GenerateEvent(
1829 GossipsubEvent::Subscribed {
1830 peer_id: *propagation_source,
1831 topic: subscription.topic_hash.clone(),
1832 },
1833 ));
1834 }
1835 GossipsubSubscriptionAction::Unsubscribe => {
1836 if peer_list.remove(propagation_source) {
1837 info!(
1838 "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
1839 propagation_source.to_string(),
1840 subscription.topic_hash
1841 );
1842 }
1843 subscribed_topics.remove(&subscription.topic_hash);
1845 unsubscribed_peers
1846 .push((*propagation_source, subscription.topic_hash.clone()));
1847 application_event.push(NetworkBehaviourAction::GenerateEvent(
1849 GossipsubEvent::Unsubscribed {
1850 peer_id: *propagation_source,
1851 topic: subscription.topic_hash.clone(),
1852 },
1853 ));
1854 }
1855 }
1856 }
1857
1858 for (peer_id, topic_hash) in unsubscribed_peers {
1860 self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
1861 }
1862
1863 if !grafts.is_empty()
1866 && self
1867 .send_message(
1868 *propagation_source,
1869 GossipsubRpc {
1870 subscriptions: Vec::new(),
1871 messages: Vec::new(),
1872 control_msgs: grafts,
1873 }
1874 .into_protobuf(),
1875 )
1876 .is_err()
1877 {
1878 error!("Failed sending grafts. Message too large");
1879 }
1880
1881 for event in application_event {
1883 self.events.push_back(event);
1884 }
1885
1886 trace!(
1887 "Completed handling subscriptions from source: {:?}",
1888 propagation_source
1889 );
1890 }
1891
1892 fn apply_iwant_penalties(&mut self) {
1894 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1895 for (peer, count) in gossip_promises.get_broken_promises() {
1896 peer_score.add_penalty(&peer, count);
1897 }
1898 }
1899 }
1900
1901 fn heartbeat(&mut self) {
1903 self.heartbeat_ticks += 1;
1904
1905 let mut to_graft = HashMap::new();
1906 let mut to_prune = HashMap::new();
1907 let mut no_px = HashSet::new();
1908
1909 self.backoffs.heartbeat();
1911
1912 self.count_sent_iwant.clear();
1914 self.count_received_ihave.clear();
1915
1916 self.apply_iwant_penalties();
1918
1919 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
1921 for p in self.explicit_peers.clone() {
1922 self.check_explicit_peer_connection(&p);
1923 }
1924 }
1925
1926 let mut scores = HashMap::new();
1928 let peer_score = &self.peer_score;
1929 let mut score = |p: &PeerId| match peer_score {
1930 Some((peer_score, ..)) => *scores
1931 .entry(*p)
1932 .or_insert_with(|| peer_score.score(p)),
1933 _ => 0.0,
1934 };
1935
1936 for (topic_hash, peers) in self.mesh.iter_mut() {
1938 let explicit_peers = &self.explicit_peers;
1939 let backoffs = &self.backoffs;
1940 let topic_peers = &self.topic_peers;
1941 let outbound_peers = &self.outbound_peers;
1942
1943 let to_remove: Vec<_> = peers
1947 .iter()
1948 .filter(|&p| {
1949 if score(p) < 0.0 {
1950 debug!(
1951 "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
1952 {}]",
1953 p,
1954 score(p),
1955 topic_hash
1956 );
1957
1958 let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
1959 current_topic.push(topic_hash.clone());
1960 no_px.insert(*p);
1961 true
1962 } else {
1963 false
1964 }
1965 })
1966 .cloned()
1967 .collect();
1968 for peer in to_remove {
1969 peers.remove(&peer);
1970 }
1971
1972 if peers.len() < self.config.mesh_n_low() {
1974 debug!(
1975 "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
1976 topic_hash,
1977 peers.len(),
1978 self.config.mesh_n_low()
1979 );
1980 let desired_peers = self.config.mesh_n() - peers.len();
1982 let peer_list = get_random_peers(
1983 topic_peers,
1984 &self.peer_protocols,
1985 topic_hash,
1986 desired_peers,
1987 |peer| {
1988 !peers.contains(peer)
1989 && !explicit_peers.contains(peer)
1990 && !backoffs.is_backoff_with_slack(topic_hash, peer)
1991 && score(peer) >= 0.0
1992 },
1993 );
1994 for peer in &peer_list {
1995 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
1996 current_topic.push(topic_hash.clone());
1997 }
1998 debug!("Updating mesh, new mesh: {:?}", peer_list);
2000 peers.extend(peer_list);
2001 }
2002
2003 if peers.len() > self.config.mesh_n_high() {
2005 debug!(
2006 "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
2007 topic_hash,
2008 peers.len(),
2009 self.config.mesh_n_high()
2010 );
2011 let excess_peer_no = peers.len() - self.config.mesh_n();
2012
2013 let mut rng = thread_rng();
2015 let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
2016 shuffled.shuffle(&mut rng);
2017 shuffled
2018 .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
2019 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2021
2022 let mut outbound = {
2024 let outbound_peers = &self.outbound_peers;
2025 shuffled
2026 .iter()
2027 .filter(|p| outbound_peers.contains(*p))
2028 .count()
2029 };
2030
2031 let mut removed = 0;
2034 for peer in shuffled {
2035 if removed == excess_peer_no {
2036 break;
2037 }
2038 if self.outbound_peers.contains(&peer) {
2039 if outbound <= self.config.mesh_outbound_min() {
2040 continue;
2042 } else {
2043 outbound -= 1;
2045 }
2046 }
2047
2048 peers.remove(&peer);
2050 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2051 current_topic.push(topic_hash.clone());
2052 removed += 1;
2053 }
2054 }
2055
2056 if peers.len() >= self.config.mesh_n_low() {
2058 let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2060
2061 if outbound < self.config.mesh_outbound_min() {
2063 let needed = self.config.mesh_outbound_min() - outbound;
2064 let peer_list = get_random_peers(
2065 topic_peers,
2066 &self.peer_protocols,
2067 topic_hash,
2068 needed,
2069 |peer| {
2070 !peers.contains(peer)
2071 && !explicit_peers.contains(peer)
2072 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2073 && score(peer) >= 0.0
2074 && outbound_peers.contains(peer)
2075 },
2076 );
2077 for peer in &peer_list {
2078 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2079 current_topic.push(topic_hash.clone());
2080 }
2081 debug!("Updating mesh, new mesh: {:?}", peer_list);
2083 peers.extend(peer_list);
2084 }
2085 }
2086
2087 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2089 && peers.len() > 1
2090 && self.peer_score.is_some()
2091 {
2092 if let Some((_, thresholds, _, _)) = &self.peer_score {
2093 let mut peers_by_score: Vec<_> = peers.iter().collect();
2103 peers_by_score
2104 .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
2105
2106 let middle = peers_by_score.len() / 2;
2107 let median = if peers_by_score.len() % 2 == 0 {
2108 (score(
2109 *peers_by_score.get(middle - 1).expect(
2110 "middle < vector length and middle > 0 since peers.len() > 0",
2111 ),
2112 ) + score(*peers_by_score.get(middle).expect("middle < vector length")))
2113 * 0.5
2114 } else {
2115 score(*peers_by_score.get(middle).expect("middle < vector length"))
2116 };
2117
2118 if median < thresholds.opportunistic_graft_threshold {
2121 let peer_list = get_random_peers(
2122 topic_peers,
2123 &self.peer_protocols,
2124 topic_hash,
2125 self.config.opportunistic_graft_peers(),
2126 |peer| {
2127 !peers.contains(peer)
2128 && !explicit_peers.contains(peer)
2129 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2130 && score(peer) > median
2131 },
2132 );
2133 for peer in &peer_list {
2134 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2135 current_topic.push(topic_hash.clone());
2136 }
2137 debug!(
2139 "Opportunistically graft in topic {} with peers {:?}",
2140 topic_hash, peer_list
2141 );
2142 peers.extend(peer_list);
2143 }
2144 }
2145 }
2146 }
2147
2148 {
2150 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2152 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2153 if *last_pub_time + fanout_ttl < Instant::now() {
2154 debug!(
2155 "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
2156 topic_hash
2157 );
2158 fanout.remove(&topic_hash);
2159 return false;
2160 }
2161 true
2162 });
2163 }
2164
2165 for (topic_hash, peers) in self.fanout.iter_mut() {
2168 let mut to_remove_peers = Vec::new();
2169 let publish_threshold = match &self.peer_score {
2170 Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2171 _ => 0.0,
2172 };
2173 for peer in peers.iter() {
2174 match self.peer_topics.get(peer) {
2176 Some(topics) => {
2177 if !topics.contains(&topic_hash) || score(peer) < publish_threshold {
2178 debug!(
2179 "HEARTBEAT: Peer removed from fanout for topic: {:?}",
2180 topic_hash
2181 );
2182 to_remove_peers.push(*peer);
2183 }
2184 }
2185 None => {
2186 to_remove_peers.push(*peer);
2188 }
2189 }
2190 }
2191 for to_remove in to_remove_peers {
2192 peers.remove(&to_remove);
2193 }
2194
2195 if peers.len() < self.config.mesh_n() {
2197 debug!(
2198 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2199 peers.len(),
2200 self.config.mesh_n()
2201 );
2202 let needed_peers = self.config.mesh_n() - peers.len();
2203 let explicit_peers = &self.explicit_peers;
2204 let new_peers = get_random_peers(
2205 &self.topic_peers,
2206 &self.peer_protocols,
2207 topic_hash,
2208 needed_peers,
2209 |peer| {
2210 !peers.contains(peer)
2211 && !explicit_peers.contains(peer)
2212 && score(peer) < publish_threshold
2213 },
2214 );
2215 peers.extend(new_peers);
2216 }
2217 }
2218
2219 if self.peer_score.is_some() {
2220 trace!("Peer_scores: {:?}", {
2221 for peer in self.peer_topics.keys() {
2222 score(peer);
2223 }
2224 scores
2225 });
2226 trace!("Mesh message deliveries: {:?}", {
2227 self.mesh
2228 .iter()
2229 .map(|(t, peers)| {
2230 (
2231 t.clone(),
2232 peers
2233 .iter()
2234 .map(|p| {
2235 (
2236 *p,
2237 peer_score
2238 .as_ref()
2239 .expect("peer_score.is_some()")
2240 .0
2241 .mesh_message_deliveries(p, t)
2242 .unwrap_or(0.0),
2243 )
2244 })
2245 .collect::<HashMap<PeerId, f64>>(),
2246 )
2247 })
2248 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2249 })
2250 }
2251
2252 self.emit_gossip();
2253
2254 if !to_graft.is_empty() | !to_prune.is_empty() {
2256 self.send_graft_prune(to_graft, to_prune, no_px);
2257 }
2258
2259 self.flush_control_pool();
2261
2262 self.mcache.shift();
2264
2265 if self.heartbeat_ticks % self.config.connection_update_ticks() == 0 {
2267 let mut peers = self.peer_topics.keys().cloned()
2268 .collect::<Vec<PeerId>>();
2269
2270 let mut rng = thread_rng();
2271
2272 while peers.len() > self.config.mesh_n_high() * 2 {
2273 let peer_id = peers.remove( rng.next_u32() as usize % peers.len() );
2274 self.disconnect_peer( peer_id, false );
2275 }
2276
2277 if !peers.is_empty() {
2279 if let Err(e) = self.send_peer_list(&peers[rng.next_u32() as usize % peers.len()]) {
2280 warn!("Failed to update random peer with a list of connected PeerId, {}", e);
2281 }
2282 }
2283 }
2284
2285 if self.heartbeat_ticks % 23 == 0 {
2287 let cur_time = Instant::now();
2288 self.blacklisted_peers.retain( |_, v| *v > cur_time );
2289 }
2290
2291 if self.heartbeat_ticks % 20 == 0 {
2293 info!("Libp2p all peers: {:?}", self.peer_topics.iter().map(|(k,_v)| k.to_string()).collect::<Vec<String>>().join(", ") );
2295 info!("Libp2p outbound peers: {}", self.outbound_peers.iter().map(|p| p.to_string() ).collect::<Vec<String>>().join(", ") );
2296 info!("Libp2p blacklisted peers: {}", self.blacklisted_peers.iter().map(|p| p.0.to_string() ).collect::<Vec<String>>().join(", ") );
2297 }
2298 }
2299
2300 fn emit_gossip(&mut self) {
2303 let mut rng = thread_rng();
2304 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2305 let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash);
2306 if message_ids.is_empty() {
2307 return;
2308 }
2309
2310 if message_ids.len() > self.config.max_ihave_length() {
2312 debug!(
2314 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2315 message_ids.len()
2316 );
2317 } else {
2318 message_ids.shuffle(&mut rng);
2320 }
2321
2322 let n_map = |m| {
2324 max(
2325 self.config.gossip_lazy(),
2326 (self.config.gossip_factor() * m as f64) as usize,
2327 )
2328 };
2329 let to_msg_peers = get_random_peers_dynamic(
2331 &self.topic_peers,
2332 &self.peer_protocols,
2333 &topic_hash,
2334 n_map,
2335 |peer| {
2336 !peers.contains(peer)
2337 && !self.explicit_peers.contains(peer)
2338 && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2339 },
2340 );
2341
2342 debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
2343
2344 for peer in to_msg_peers {
2345 let mut peer_message_ids = message_ids.clone();
2346
2347 if peer_message_ids.len() > self.config.max_ihave_length() {
2348 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2352 peer_message_ids.truncate(self.config.max_ihave_length());
2353 }
2354
2355 Self::control_pool_add(
2357 &mut self.control_pool,
2358 peer,
2359 GossipsubControlAction::IHave {
2360 topic_hash: topic_hash.clone(),
2361 message_ids: peer_message_ids,
2362 },
2363 );
2364 }
2365 }
2366 }
2367
2368 fn send_graft_prune(
2371 &mut self,
2372 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2373 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2374 no_px: HashSet<PeerId>,
2375 ) {
2376 for (peer, topics) in to_graft.iter() {
2378 for topic in topics {
2379 if let Some((peer_score, ..)) = &mut self.peer_score {
2381 peer_score.graft(peer, topic.clone());
2382 }
2383 }
2384 let mut control_msgs: Vec<GossipsubControlAction> = topics
2385 .iter()
2386 .map(|topic_hash| GossipsubControlAction::Graft {
2387 topic_hash: topic_hash.clone(),
2388 })
2389 .collect();
2390
2391 if let Some(topics) = to_prune.remove(peer) {
2393 let mut prunes = topics
2394 .iter()
2395 .map(|topic_hash| {
2396 self.make_prune(
2397 topic_hash,
2398 peer,
2399 self.config.do_px() && !no_px.contains(peer),
2400 )
2401 })
2402 .collect::<Vec<_>>();
2403 control_msgs.append(&mut prunes);
2404 }
2405
2406 if self
2408 .send_message(
2409 *peer,
2410 GossipsubRpc {
2411 subscriptions: Vec::new(),
2412 messages: Vec::new(),
2413 control_msgs,
2414 }
2415 .into_protobuf(),
2416 )
2417 .is_err()
2418 {
2419 error!("Failed to send control messages. Message too large");
2420 }
2421 }
2422
2423 for (peer, topics) in to_prune.iter() {
2425 let remaining_prunes = topics
2426 .iter()
2427 .map(|topic_hash| {
2428 self.make_prune(
2429 topic_hash,
2430 peer,
2431 self.config.do_px() && !no_px.contains(peer),
2432 )
2433 })
2434 .collect();
2435 if self
2436 .send_message(
2437 *peer,
2438 GossipsubRpc {
2439 subscriptions: Vec::new(),
2440 messages: Vec::new(),
2441 control_msgs: remaining_prunes,
2442 }
2443 .into_protobuf(),
2444 )
2445 .is_err()
2446 {
2447 error!("Failed to send prune messages. Message too large");
2448 }
2449 }
2450 }
2451
2452 fn forward_msg(
2456 &mut self,
2457 msg_id: &MessageId,
2458 message: RawGossipsubMessage,
2459 propagation_source: Option<&PeerId>,
2460 ) -> Result<bool, PublishError> {
2461 if let Some((peer_score, ..)) = &mut self.peer_score {
2463 if let Some(peer) = propagation_source {
2464 peer_score.deliver_message(peer, msg_id, &message.topic);
2465 }
2466 }
2467
2468 debug!("Forwarding message: {:?}", msg_id);
2469 let mut recipient_peers = HashSet::new();
2470
2471 let topic = &message.topic;
2473 if let Some(mesh_peers) = self.mesh.get(&topic) {
2475 for peer_id in mesh_peers {
2476 if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
2477 recipient_peers.insert(*peer_id);
2478 }
2479 }
2480 }
2481
2482 for p in &self.explicit_peers {
2484 if let Some(topics) = self.peer_topics.get(p) {
2485 if Some(p) != propagation_source
2486 && Some(p) != message.source.as_ref()
2487 && topics.contains(&message.topic)
2488 {
2489 recipient_peers.insert(*p);
2490 }
2491 }
2492 }
2493
2494 if !recipient_peers.is_empty() {
2496 let event = Arc::new(
2497 GossipsubRpc {
2498 subscriptions: Vec::new(),
2499 messages: vec![message.clone()],
2500 control_msgs: Vec::new(),
2501 }
2502 .into_protobuf(),
2503 );
2504
2505 for peer in recipient_peers.iter() {
2506 debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
2507 self.send_message(*peer, event.clone())?;
2508 }
2509 debug!("Completed forwarding message");
2510 Ok(true)
2511 } else {
2512 Ok(false)
2513 }
2514 }
2515
2516 pub(crate) fn build_raw_message(
2518 &self,
2519 topic: TopicHash,
2520 data: Vec<u8>,
2521 ) -> Result<RawGossipsubMessage, PublishError> {
2522 match &self.publish_config {
2523 PublishConfig::Signing {
2524 ref keypair,
2525 author,
2526 inline_key,
2527 } => {
2528 let sequence_number: u64 = rand::random();
2530
2531 let signature = {
2532 let message = rpc_proto::Message {
2533 from: Some(author.clone().to_bytes()),
2534 data: Some(data.clone()),
2535 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2536 topic: topic.clone().into_string(),
2537 signature: None,
2538 key: None,
2539 };
2540
2541 let mut buf = Vec::with_capacity(message.encoded_len());
2542 message
2543 .encode(&mut buf)
2544 .expect("Buffer has sufficient capacity");
2545
2546 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2548 signature_bytes.extend_from_slice(&buf);
2549 Some(keypair.sign(&signature_bytes)?)
2550 };
2551
2552 Ok(RawGossipsubMessage {
2553 source: Some(*author),
2554 data,
2555 sequence_number: Some(sequence_number),
2558 topic,
2559 signature,
2560 key: inline_key.clone(),
2561 validated: true, })
2563 }
2564 PublishConfig::Author(peer_id) => {
2565 Ok(RawGossipsubMessage {
2566 source: Some(*peer_id),
2567 data,
2568 sequence_number: Some(rand::random()),
2571 topic,
2572 signature: None,
2573 key: None,
2574 validated: true, })
2576 }
2577 PublishConfig::RandomAuthor => {
2578 Ok(RawGossipsubMessage {
2579 source: Some(PeerId::random()),
2580 data,
2581 sequence_number: Some(rand::random()),
2584 topic,
2585 signature: None,
2586 key: None,
2587 validated: true, })
2589 }
2590 PublishConfig::Anonymous => {
2591 Ok(RawGossipsubMessage {
2592 source: None,
2593 data,
2594 sequence_number: None,
2597 topic,
2598 signature: None,
2599 key: None,
2600 validated: true, })
2602 }
2603 }
2604 }
2605
2606 pub fn disconnect_peer( &mut self, peer_id: PeerId, ban: bool ) {
2607 if ban {
2608 self.blacklisted_peers.insert(peer_id, Instant::now().add(self.config.ban_peer_duration().clone()) );
2609 }
2610 self.events.push_back(NetworkBehaviourAction::DisconnectPeer {peer_id});
2611 }
2612
2613 fn control_pool_add(
2615 control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
2616 peer: PeerId,
2617 control: GossipsubControlAction,
2618 ) {
2619 control_pool
2620 .entry(peer)
2621 .or_insert_with(Vec::new)
2622 .push(control);
2623 }
2624
2625 fn flush_control_pool(&mut self) {
2627 for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
2628 if self
2629 .send_message(
2630 peer,
2631 GossipsubRpc {
2632 subscriptions: Vec::new(),
2633 messages: Vec::new(),
2634 control_msgs: controls,
2635 }
2636 .into_protobuf(),
2637 )
2638 .is_err()
2639 {
2640 error!("Failed to flush control pool. Message too large");
2641 }
2642 }
2643 }
2644
2645 fn send_message(
2648 &mut self,
2649 peer_id: PeerId,
2650 message: impl Into<Arc<rpc_proto::Rpc>>,
2651 ) -> Result<(), PublishError> {
2652 let messages = self.fragment_message(message.into())?;
2657
2658 for message in messages {
2659 self.events
2660 .push_back(NetworkBehaviourAction::NotifyHandler {
2661 peer_id,
2662 event: message,
2663 handler: NotifyHandler::Any,
2664 })
2665 }
2666 Ok(())
2667 }
2668
2669 fn fragment_message(
2672 &self,
2673 rpc: Arc<rpc_proto::Rpc>,
2674 ) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
2675 if rpc.encoded_len() < self.config.max_transmit_size() {
2676 return Ok(vec![rpc]);
2677 }
2678
2679 let new_rpc = rpc_proto::Rpc {
2680 subscriptions: Vec::new(),
2681 publish: Vec::new(),
2682 control: None,
2683 };
2684
2685 let mut rpc_list = vec![new_rpc.clone()];
2686
2687 macro_rules! create_or_add_rpc {
2690 ($object_size: ident ) => {
2691 let list_index = rpc_list.len() - 1; if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
2696 > self.config.max_transmit_size()
2697 && rpc_list[list_index] != new_rpc
2698 {
2699 rpc_list.push(new_rpc.clone());
2701 }
2702 };
2703 }
2704
2705 macro_rules! add_item {
2706 ($object: ident, $type: ident ) => {
2707 let object_size = $object.encoded_len();
2708
2709 if object_size + 2 > self.config.max_transmit_size() {
2710 error!("Individual message too large to fragment");
2713 return Err(PublishError::MessageTooLarge);
2714 }
2715
2716 create_or_add_rpc!(object_size);
2717 rpc_list
2718 .last_mut()
2719 .expect("Must have at least one element")
2720 .$type
2721 .push($object.clone());
2722 };
2723 }
2724
2725 for message in &rpc.publish {
2727 add_item!(message, publish);
2728 }
2729 for subscription in &rpc.subscriptions {
2730 add_item!(subscription, subscriptions);
2731 }
2732
2733 let empty_control = rpc_proto::ControlMessage::default();
2736 if let Some(control) = rpc.control.as_ref() {
2737 if control.encoded_len() + 2 > self.config.max_transmit_size() {
2738 for ihave in &control.ihave {
2740 let len = ihave.encoded_len();
2741 create_or_add_rpc!(len);
2742 rpc_list
2743 .last_mut()
2744 .expect("Always an element")
2745 .control
2746 .get_or_insert_with(|| empty_control.clone())
2747 .ihave
2748 .push(ihave.clone());
2749 }
2750 for iwant in &control.iwant {
2751 let len = iwant.encoded_len();
2752 create_or_add_rpc!(len);
2753 rpc_list
2754 .last_mut()
2755 .expect("Always an element")
2756 .control
2757 .get_or_insert_with(|| empty_control.clone())
2758 .iwant
2759 .push(iwant.clone());
2760 }
2761 for graft in &control.graft {
2762 let len = graft.encoded_len();
2763 create_or_add_rpc!(len);
2764 rpc_list
2765 .last_mut()
2766 .expect("Always an element")
2767 .control
2768 .get_or_insert_with(|| empty_control.clone())
2769 .graft
2770 .push(graft.clone());
2771 }
2772 for prune in &control.prune {
2773 let len = prune.encoded_len();
2774 create_or_add_rpc!(len);
2775 rpc_list
2776 .last_mut()
2777 .expect("Always an element")
2778 .control
2779 .get_or_insert_with(|| empty_control.clone())
2780 .prune
2781 .push(prune.clone());
2782 }
2783 } else {
2784 let len = control.encoded_len();
2785 create_or_add_rpc!(len);
2786 rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
2787 }
2788 }
2789
2790 Ok(rpc_list.into_iter().map(Arc::new).collect())
2791 }
2792
2793 fn send_peer_list(&mut self, peer_id: &PeerId) -> Result<(), PublishError> {
2795 let mut ser = SimplePushSerializer::new(1);
2797
2798 let peers = self.peer_topics.keys().cloned()
2799 .filter(|p| !self.blacklisted_peers.contains_key(p) )
2800 .collect::<Vec<PeerId>>();
2801 let len = std::cmp::min(peers.len(), PEER_EXCHANGE_NUMBER_LIMIT); ser.push_u16(len as u16);
2803
2804 for i in 0..len {
2805 ser.push_vec( &peers[i].to_bytes() );
2806 }
2807
2808 let data = ser.to_vec();
2810
2811 let peer_topic : Topic<IdentityHash> = Topic::new(PEER_TOPIC);
2812
2813 let transformed_data = self
2815 .data_transform
2816 .outbound_transform(&peer_topic.hash(), data.clone())?;
2817
2818 let raw_message = self.build_raw_message(peer_topic.into(), transformed_data)?;
2819
2820 let event = Arc::new(
2821 GossipsubRpc {
2822 subscriptions: Vec::new(),
2823 messages: vec![raw_message.clone()],
2824 control_msgs: Vec::new(),
2825 }
2826 .into_protobuf(),
2827 );
2828
2829 self.send_message(peer_id.clone(), event.clone())
2830 }
2831}
2832
2833fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2834 addr.iter().find_map(|p| match p {
2835 Ip4(addr) => Some(IpAddr::V4(addr)),
2836 Ip6(addr) => Some(IpAddr::V6(addr)),
2837 _ => None,
2838 })
2839}
2840
2841impl<C, F> NetworkBehaviour for Gossipsub<C, F>
2842where
2843 C: Send + 'static + DataTransform,
2844 F: Send + 'static + TopicSubscriptionFilter,
2845{
2846 type ProtocolsHandler = GossipsubHandler;
2847 type OutEvent = GossipsubEvent;
2848
2849 fn new_handler(&mut self) -> Self::ProtocolsHandler {
2850 GossipsubHandler::new(
2851 self.config.protocol_id_prefix().clone(),
2852 self.config.max_transmit_size(),
2853 self.config.validation_mode().clone(),
2854 self.config.support_floodsub(),
2855 )
2856 }
2857
2858 fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
2859 Vec::new()
2860 }
2861
2862 fn inject_connected(&mut self, peer_id: &PeerId) {
2863 if self.blacklisted_peers.contains_key(peer_id) {
2865 debug!("Ignoring connection from blacklisted peer: {}", peer_id);
2866 self.disconnect_peer(peer_id.clone(), true); return;
2868 }
2869
2870 info!("New peer connected: {}", peer_id);
2871 let mut subscriptions = vec![];
2873 for topic_hash in self.mesh.keys() {
2874 subscriptions.push(GossipsubSubscription {
2875 topic_hash: topic_hash.clone(),
2876 action: GossipsubSubscriptionAction::Subscribe,
2877 });
2878 }
2879
2880 if !subscriptions.is_empty() {
2881 if self
2883 .send_message(
2884 *peer_id,
2885 GossipsubRpc {
2886 messages: Vec::new(),
2887 subscriptions,
2888 control_msgs: Vec::new(),
2889 }
2890 .into_protobuf(),
2891 )
2892 .is_err()
2893 {
2894 error!("Failed to send subscriptions, message too large");
2895 }
2896 }
2897
2898 if self.send_peer_list( peer_id ).is_err() {
2900 error!("Failed to send peer list on connect");
2901 }
2902
2903 self.peer_topics.insert(*peer_id, Default::default());
2905
2906 self.peer_protocols
2912 .entry(*peer_id)
2913 .or_insert(PeerKind::Floodsub);
2914
2915 if let Some((peer_score, ..)) = &mut self.peer_score {
2916 peer_score.add_peer(*peer_id);
2917 }
2918 }
2919
2920 fn inject_disconnected(&mut self, peer_id: &PeerId) {
2921 debug!("Peer disconnected: {}", peer_id);
2923 {
2924 let topics = match self.peer_topics.get(peer_id) {
2925 Some(topics) => (topics),
2926 None => {
2927 if !self.blacklisted_peers.contains_key(peer_id) {
2928 debug!("Disconnected node, in blacklist");
2929 }
2930 return;
2931 }
2932 };
2933
2934 for topic in topics {
2936 if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
2938 mesh_peers.remove(peer_id);
2940 }
2941
2942 if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
2944 if !peer_list.remove(peer_id) {
2945 warn!(
2947 "Disconnected node: {} not in topic_peers peer list",
2948 peer_id
2949 );
2950 }
2951 } else {
2952 warn!(
2953 "Disconnected node: {} with topic: {:?} not in topic_peers",
2954 &peer_id, &topic
2955 );
2956 }
2957
2958 self.fanout
2960 .get_mut(&topic)
2961 .map(|peers| peers.remove(peer_id));
2962 }
2963
2964 self.px_peers.remove(peer_id);
2966 self.outbound_peers.remove(peer_id);
2967 }
2968
2969 self.peer_topics.remove(peer_id);
2973 self.peer_protocols.remove(peer_id);
2974
2975 if let Some((peer_score, ..)) = &mut self.peer_score {
2976 peer_score.remove_peer(peer_id);
2977 }
2978 }
2979
2980 fn inject_connection_established(
2981 &mut self,
2982 peer_id: &PeerId,
2983 _: &ConnectionId,
2984 endpoint: &ConnectedPoint,
2985 ) {
2986 if self.config.accept_dalek_pk_peers_only() && peer_id.as_dalek_pubkey().is_err() {
2988 warn!("Rejecting peer {} because it doesn't identified with Dalek PK", peer_id);
2989 self.disconnect_peer(*peer_id, true); return;
2991 }
2992
2993 if self.blacklisted_peers.contains_key(peer_id) {
2995 debug!("Rejecting peer {} because it blacklisted", peer_id);
2996 self.disconnect_peer(*peer_id, true); return;
2998 }
2999
3000 if let ConnectedPoint::Dialer { .. } = endpoint {
3002 if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
3008 self.outbound_peers.insert(*peer_id);
3011 }
3012 }
3013
3014 if let Some((peer_score, ..)) = &mut self.peer_score {
3016 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
3017 peer_score.add_ip(&peer_id, ip);
3018 } else {
3019 trace!(
3020 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3021 peer_id,
3022 endpoint
3023 )
3024 }
3025 }
3026 }
3027
3028 fn inject_connection_closed(
3029 &mut self,
3030 peer: &PeerId,
3031 _: &ConnectionId,
3032 endpoint: &ConnectedPoint,
3033 ) {
3034 if let Some((peer_score, ..)) = &mut self.peer_score {
3036 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
3037 peer_score.remove_ip(peer, &ip);
3038 } else {
3039 trace!(
3040 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3041 peer,
3042 endpoint
3043 )
3044 }
3045 }
3046 }
3047
3048 fn inject_address_change(
3049 &mut self,
3050 peer: &PeerId,
3051 _: &ConnectionId,
3052 endpoint_old: &ConnectedPoint,
3053 endpoint_new: &ConnectedPoint,
3054 ) {
3055 if let Some((peer_score, ..)) = &mut self.peer_score {
3057 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3058 peer_score.remove_ip(peer, &ip);
3059 } else {
3060 trace!(
3061 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3062 peer,
3063 endpoint_old
3064 )
3065 }
3066 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3067 peer_score.add_ip(&peer, ip);
3068 } else {
3069 trace!(
3070 "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3071 peer,
3072 endpoint_new
3073 )
3074 }
3075 }
3076 }
3077
3078 fn inject_event(
3079 &mut self,
3080 propagation_source: PeerId,
3081 _: ConnectionId,
3082 handler_event: HandlerEvent,
3083 ) {
3084 match handler_event {
3085 HandlerEvent::PeerKind(kind) => {
3086 if let PeerKind::NotSupported = kind {
3088 debug!(
3089 "Peer does not support gossipsub protocols. {}",
3090 propagation_source
3091 );
3092 self.inject_disconnected(&propagation_source);
3094 } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
3095 debug!(
3098 "New peer type found: {} for peer: {}",
3099 kind, propagation_source
3100 );
3101 if let PeerKind::Floodsub = *old_kind {
3102 *old_kind = kind;
3103 }
3104 }
3105 }
3106 HandlerEvent::Message {
3107 rpc,
3108 invalid_messages,
3109 } => {
3110 if !rpc.subscriptions.is_empty() {
3115 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3116 }
3117
3118 if let (true, _) =
3120 self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3121 {
3122 debug!("RPC Dropped from greylisted peer {}", propagation_source);
3123 return;
3124 }
3125
3126 if self.peer_score.is_some() {
3128 for (raw_message, validation_error) in invalid_messages {
3129 self.handle_invalid_message(
3130 &propagation_source,
3131 raw_message,
3132 validation_error,
3133 )
3134 }
3135 } else {
3136 for (message, validation_error) in invalid_messages {
3138 warn!(
3139 "Invalid message. Reason: {:?} propagation_peer {} source {:?}",
3140 validation_error,
3141 propagation_source.to_string(),
3142 message.source
3143 );
3144 }
3145 }
3146
3147 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3149 if self.config.max_messages_per_rpc().is_some()
3151 && Some(count) >= self.config.max_messages_per_rpc()
3152 {
3153 warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3154 break;
3155 }
3156 self.handle_received_message(raw_message, &propagation_source);
3157 }
3158
3159 let mut ihave_msgs = vec![];
3162 let mut graft_msgs = vec![];
3163 let mut prune_msgs = vec![];
3164 for control_msg in rpc.control_msgs {
3165 match control_msg {
3166 GossipsubControlAction::IHave {
3167 topic_hash,
3168 message_ids,
3169 } => {
3170 ihave_msgs.push((topic_hash, message_ids));
3171 }
3172 GossipsubControlAction::IWant { message_ids } => {
3173 self.handle_iwant(&propagation_source, message_ids)
3174 }
3175 GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
3176 GossipsubControlAction::Prune {
3177 topic_hash,
3178 peers,
3179 backoff,
3180 } => prune_msgs.push((topic_hash, peers, backoff)),
3181 }
3182 }
3183 if !ihave_msgs.is_empty() {
3184 self.handle_ihave(&propagation_source, ihave_msgs);
3185 }
3186 if !graft_msgs.is_empty() {
3187 self.handle_graft(&propagation_source, graft_msgs);
3188 }
3189 if !prune_msgs.is_empty() {
3190 self.handle_prune(&propagation_source, prune_msgs);
3191 }
3192 }
3193 }
3194 }
3195
3196 fn poll(
3197 &mut self,
3198 cx: &mut Context<'_>,
3199 _: &mut impl PollParameters,
3200 ) -> Poll<
3201 NetworkBehaviourAction<
3202 <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
3203 Self::OutEvent,
3204 >,
3205 > {
3206 if let Some(event) = self.events.pop_front() {
3207 return Poll::Ready(match event {
3208 NetworkBehaviourAction::NotifyHandler {
3209 peer_id,
3210 handler,
3211 event: send_event,
3212 } => {
3213 let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
3215 NetworkBehaviourAction::NotifyHandler {
3216 peer_id,
3217 event,
3218 handler,
3219 }
3220 }
3221 NetworkBehaviourAction::GenerateEvent(e) => {
3222 NetworkBehaviourAction::GenerateEvent(e)
3223 }
3224 NetworkBehaviourAction::DialAddress { address } => {
3225 NetworkBehaviourAction::DialAddress { address }
3226 }
3227 NetworkBehaviourAction::DialPeer { peer_id, condition } => {
3228 NetworkBehaviourAction::DialPeer { peer_id, condition }
3229 }
3230 NetworkBehaviourAction::ReportObservedAddr { address, score } => {
3231 NetworkBehaviourAction::ReportObservedAddr { address, score }
3232 }
3233 NetworkBehaviourAction::DisconnectPeer { peer_id } => {
3234 NetworkBehaviourAction::DisconnectPeer { peer_id }
3235 }
3236 });
3237 }
3238
3239 if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
3241 while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
3242 peer_score.refresh_scores();
3243 }
3244 }
3245
3246 while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
3247 self.heartbeat();
3248 }
3249
3250 Poll::Pending
3251 }
3252}
3253
3254fn get_random_peers_dynamic(
3258 topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3259 peer_protocols: &HashMap<PeerId, PeerKind>,
3260 topic_hash: &TopicHash,
3261 n_map: impl Fn(usize) -> usize,
3263 mut f: impl FnMut(&PeerId) -> bool,
3264) -> BTreeSet<PeerId> {
3265 let mut gossip_peers = match topic_peers.get(topic_hash) {
3266 Some(peer_list) => peer_list
3268 .iter()
3269 .cloned()
3270 .filter(|p| {
3271 f(p) && match peer_protocols.get(p) {
3272 Some(PeerKind::Gossipsub) => true,
3273 Some(PeerKind::Gossipsubv1_1) => true,
3274 _ => false,
3275 }
3276 })
3277 .collect(),
3278 None => Vec::new(),
3279 };
3280
3281 let n = n_map(gossip_peers.len());
3283 if gossip_peers.len() <= n {
3284 debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3285 return gossip_peers.into_iter().collect();
3286 }
3287
3288 let mut rng = thread_rng();
3290 gossip_peers.partial_shuffle(&mut rng, n);
3291
3292 debug!("RANDOM PEERS: Got {:?} peers", n);
3293
3294 gossip_peers.into_iter().take(n).collect()
3295}
3296
3297fn get_random_peers(
3300 topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3301 peer_protocols: &HashMap<PeerId, PeerKind>,
3302 topic_hash: &TopicHash,
3303 n: usize,
3304 f: impl FnMut(&PeerId) -> bool,
3305) -> BTreeSet<PeerId> {
3306 get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
3307}
3308
3309fn validate_config(
3312 authenticity: &MessageAuthenticity,
3313 validation_mode: &ValidationMode,
3314) -> Result<(), &'static str> {
3315 match validation_mode {
3316 ValidationMode::Anonymous => {
3317 if authenticity.is_signing() {
3318 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3319 }
3320
3321 if !authenticity.is_anonymous() {
3322 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3323 }
3324 }
3325 ValidationMode::Strict => {
3326 if !authenticity.is_signing() {
3327 return Err(
3328 "Messages will be
3329 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3330 the validation or privacy settings in the config"
3331 );
3332 }
3333 }
3334 _ => {}
3335 }
3336 Ok(())
3337}
3338
3339impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
3340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3341 f.debug_struct("Gossipsub")
3342 .field("config", &self.config)
3343 .field("events", &self.events)
3344 .field("control_pool", &self.control_pool)
3345 .field("publish_config", &self.publish_config)
3346 .field("topic_peers", &self.topic_peers)
3347 .field("peer_topics", &self.peer_topics)
3348 .field("explicit_peers", &self.explicit_peers)
3349 .field("blacklisted_peers", &self.blacklisted_peers)
3350 .field("mesh", &self.mesh)
3351 .field("fanout", &self.fanout)
3352 .field("fanout_last_pub", &self.fanout_last_pub)
3353 .field("mcache", &self.mcache)
3354 .field("heartbeat", &self.heartbeat)
3355 .field("px_peers", &self.px_peers)
3356 .field("outbound_peers", &self.outbound_peers)
3357 .field("count_received_ihave", &self.count_received_ihave)
3358 .field("count_sent_iwant", &self.count_sent_iwant)
3359 .finish()
3360 }
3361}
3362
3363impl fmt::Debug for PublishConfig {
3364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3365 match self {
3366 PublishConfig::Signing { author, .. } => {
3367 f.write_fmt(format_args!("PublishConfig::Signing({})", author))
3368 }
3369 PublishConfig::Author(author) => {
3370 f.write_fmt(format_args!("PublishConfig::Author({})", author))
3371 }
3372 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3373 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3374 }
3375 }
3376}
3377
3378#[cfg(test)]
3379mod local_test {
3380 use super::*;
3381 use crate::IdentTopic;
3382 use asynchronous_codec::Encoder;
3383 use quickcheck::*;
3384 use rand::Rng;
3385
3386 fn empty_rpc() -> GossipsubRpc {
3387 GossipsubRpc {
3388 subscriptions: Vec::new(),
3389 messages: Vec::new(),
3390 control_msgs: Vec::new(),
3391 }
3392 }
3393
3394 fn test_message() -> RawGossipsubMessage {
3395 RawGossipsubMessage {
3396 source: Some(PeerId::random()),
3397 data: vec![0; 100],
3398 sequence_number: None,
3399 topic: TopicHash::from_raw("test_topic"),
3400 signature: None,
3401 key: None,
3402 validated: false,
3403 }
3404 }
3405
3406 fn test_subscription() -> GossipsubSubscription {
3407 GossipsubSubscription {
3408 action: GossipsubSubscriptionAction::Subscribe,
3409 topic_hash: IdentTopic::new("TestTopic").hash(),
3410 }
3411 }
3412
3413 fn test_control() -> GossipsubControlAction {
3414 GossipsubControlAction::IHave {
3415 topic_hash: IdentTopic::new("TestTopic").hash(),
3416 message_ids: vec![MessageId(vec![12u8]); 5],
3417 }
3418 }
3419
3420 impl Arbitrary for GossipsubRpc {
3421 fn arbitrary<G: Gen>(g: &mut G) -> Self {
3422 let mut rpc = empty_rpc();
3423
3424 for _ in 0..g.gen_range(0, 10) {
3425 rpc.subscriptions.push(test_subscription());
3426 }
3427 for _ in 0..g.gen_range(0, 10) {
3428 rpc.messages.push(test_message());
3429 }
3430 for _ in 0..g.gen_range(0, 10) {
3431 rpc.control_msgs.push(test_control());
3432 }
3433 rpc
3434 }
3435 }
3436
3437 #[test]
3438 fn test_message_fragmentation_deterministic() {
3440 let max_transmit_size = 500;
3441 let config = crate::GossipsubConfigBuilder::default()
3442 .max_transmit_size(max_transmit_size)
3443 .validation_mode(ValidationMode::Permissive)
3444 .build()
3445 .unwrap();
3446 let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3447
3448 let mut rpc = empty_rpc();
3450 rpc.messages.push(test_message());
3451
3452 let mut rpc_proto = rpc.clone().into_protobuf();
3453 let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
3454 assert_eq!(
3455 fragmented_messages,
3456 vec![Arc::new(rpc_proto.clone())],
3457 "Messages under the limit shouldn't be fragmented"
3458 );
3459
3460 while rpc_proto.encoded_len() < max_transmit_size {
3463 rpc.messages.push(test_message());
3464 rpc_proto = rpc.clone().into_protobuf();
3465 }
3466
3467 let fragmented_messages = gs
3468 .fragment_message(Arc::new(rpc_proto))
3469 .expect("Should be able to fragment the messages");
3470
3471 assert!(
3472 fragmented_messages.len() > 1,
3473 "the message should be fragmented"
3474 );
3475
3476 for message in fragmented_messages {
3478 assert!(
3479 message.encoded_len() < max_transmit_size,
3480 "all messages should be less than the transmission size"
3481 );
3482 }
3483 }
3484
3485 #[test]
3486 fn test_message_fragmentation() {
3487 fn prop(rpc: GossipsubRpc) {
3488 let max_transmit_size = 500;
3489 let config = crate::GossipsubConfigBuilder::default()
3490 .max_transmit_size(max_transmit_size)
3491 .validation_mode(ValidationMode::Permissive)
3492 .build()
3493 .unwrap();
3494 let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3495
3496 let mut length_codec = unsigned_varint::codec::UviBytes::default();
3497 length_codec.set_max_len(max_transmit_size);
3498 let mut codec =
3499 crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);
3500
3501 let rpc_proto = rpc.into_protobuf();
3502 let fragmented_messages = gs
3503 .fragment_message(Arc::new(rpc_proto.clone()))
3504 .expect("Messages must be valid");
3505
3506 if rpc_proto.encoded_len() < max_transmit_size {
3507 assert_eq!(
3508 fragmented_messages.len(),
3509 1,
3510 "the message should not be fragmented"
3511 );
3512 } else {
3513 assert!(
3514 fragmented_messages.len() > 1,
3515 "the message should be fragmented"
3516 );
3517 }
3518
3519 for message in fragmented_messages {
3521 assert!(
3522 message.encoded_len() < max_transmit_size,
3523 "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
3524 );
3525
3526 let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
3528 codec
3529 .encode(Arc::try_unwrap(message).unwrap(), &mut buf)
3530 .unwrap()
3531 }
3532 }
3533 QuickCheck::new()
3534 .max_tests(100)
3535 .quickcheck(prop as fn(_) -> _)
3536 }
3537}