cs_mwc_libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{max, Ordering},
23    collections::HashSet,
24    collections::VecDeque,
25    collections::{BTreeSet, HashMap},
26    fmt,
27    iter::FromIterator,
28    net::IpAddr,
29    sync::Arc,
30    task::{Context, Poll},
31    time::Duration,
32};
33
34use futures::StreamExt;
35use log::{debug, error, info, trace, warn};
36use prost::Message;
37use rand::{seq::SliceRandom, seq::IteratorRandom, thread_rng, RngCore};
38use wasm_timer::{Instant, Interval};
39
40use mwc_libp2p_core::{
41    connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
42    multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, SimplePushSerializer
43};
44use mwc_libp2p_swarm::{
45    DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
46    ProtocolsHandler,
47};
48
49use crate::backoff::BackoffStorage;
50use crate::config::{GossipsubConfig, ValidationMode};
51use crate::error::{PublishError, SubscriptionError, ValidationError};
52use crate::gossip_promises::GossipPromises;
53use crate::handler::{GossipsubHandler, HandlerEvent};
54use crate::mcache::MessageCache;
55use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
56use crate::protocol::SIGNING_PREFIX;
57use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
58use crate::time_cache::{DuplicateCache, TimeCache};
59use crate::topic::{Hasher, Topic, TopicHash, IdentityHash};
60use crate::transform::{DataTransform, IdentityTransform};
61use crate::types::{
62    FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
63    GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
64};
65use crate::types::{GossipsubRpc, PeerKind};
66use crate::{rpc_proto, TopicScoreParams};
67use std::{cmp::Ordering::Equal, fmt::Debug};
68use std::ops::Add;
69
70#[cfg(test)]
71mod tests;
72
73/// Topic at what node exchanged with peers.
74pub static PEER_TOPIC: &str = "Peers";
75/// Maximum number of peers in the list
76pub const  PEER_EXCHANGE_NUMBER_LIMIT : usize = 20;
77
78/// Determines if published messages should be signed or not.
79///
80/// Without signing, a number of privacy preserving modes can be selected.
81///
82/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
83/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages.
84#[derive(Clone)]
85pub enum MessageAuthenticity {
86    /// Message signing is enabled. The author will be the owner of the key and the sequence number
87    /// will be a random number.
88    Signed(Keypair),
89    /// Message signing is disabled.
90    ///
91    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
92    /// number will be randomized.
93    Author(PeerId),
94    /// Message signing is disabled.
95    ///
96    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
97    /// randomized.
98    RandomAuthor,
99    /// Message signing is disabled.
100    ///
101    /// The author of the message and the sequence numbers are excluded from the message.
102    ///
103    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
104    /// enforce validation of these fields. See [`ValidationMode`] in the [`GossipsubConfig`]
105    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
106    /// function will need to be set to prevent all messages from a peer being filtered
107    /// as duplicates.
108    Anonymous,
109}
110
111impl MessageAuthenticity {
112    /// Returns true if signing is enabled.
113    pub fn is_signing(&self) -> bool {
114        matches!(self, MessageAuthenticity::Signed(_))
115    }
116
117    pub fn is_anonymous(&self) -> bool {
118        matches!(self, MessageAuthenticity::Anonymous)
119    }
120}
121
122/// Event that can be emitted by the gossipsub behaviour.
123#[derive(Debug)]
124pub enum GossipsubEvent {
125    /// A message has been received.
126    Message {
127        /// The peer that forwarded us this message.
128        propagation_source: PeerId,
129        /// The [`MessageId`] of the message. This should be referenced by the application when
130        /// validating a message (if required).
131        message_id: MessageId,
132        /// The decompressed message itself.
133        message: GossipsubMessage,
134    },
135    /// A remote subscribed to a topic.
136    Subscribed {
137        /// Remote that has subscribed.
138        peer_id: PeerId,
139        /// The topic it has subscribed to.
140        topic: TopicHash,
141    },
142    /// A remote unsubscribed from a topic.
143    Unsubscribed {
144        /// Remote that has unsubscribed.
145        peer_id: PeerId,
146        /// The topic it has subscribed from.
147        topic: TopicHash,
148    },
149}
150
151/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
152/// for further details.
153enum PublishConfig {
154    Signing {
155        keypair: Keypair,
156        author: PeerId,
157        inline_key: Option<Vec<u8>>,
158    },
159    Author(PeerId),
160    RandomAuthor,
161    Anonymous,
162}
163
164impl PublishConfig {
165    pub fn get_own_id(&self) -> Option<&PeerId> {
166        match self {
167            Self::Signing { author, .. } => Some(&author),
168            Self::Author(author) => Some(&author),
169            _ => None,
170        }
171    }
172}
173
174impl From<MessageAuthenticity> for PublishConfig {
175    fn from(authenticity: MessageAuthenticity) -> Self {
176        match authenticity {
177            MessageAuthenticity::Signed(keypair) => {
178                let public_key = keypair.public();
179                let key_enc = public_key.clone().into_protobuf_encoding();
180                let key = if key_enc.len() <= 42 {
181                    // The public key can be inlined in [`rpc_proto::Message::from`], so we don't include it
182                    // specifically in the [`rpc_proto::Message::key`] field.
183                    None
184                } else {
185                    // Include the protobuf encoding of the public key in the message.
186                    Some(key_enc)
187                };
188
189                PublishConfig::Signing {
190                    keypair,
191                    author: public_key.into_peer_id(),
192                    inline_key: key,
193                }
194            }
195            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
196            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
197            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
198        }
199    }
200}
201
202type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
203
204/// Network behaviour that handles the gossipsub protocol.
205///
206/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If
207/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
208/// appropriate level to accept unsigned messages.
209///
210/// The DataTransform trait allows applications to optionally add extra encoding/decoding
211/// functionality to the underlying messages. This is intended for custom compression algorithms.
212///
213/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
214/// prevent unwanted messages being propagated and evaluated.
215pub struct Gossipsub<
216    D: DataTransform = IdentityTransform,
217    F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
218> {
219    /// Configuration providing gossipsub performance parameters.
220    config: GossipsubConfig,
221
222    /// Events that need to be yielded to the outside when polling.
223    events: VecDeque<GossipsubNetworkBehaviourAction>,
224
225    /// Pools non-urgent control messages between heartbeats.
226    control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
227
228    /// Information used for publishing messages.
229    publish_config: PublishConfig,
230
231    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
232    /// duplicates from being propagated to the application and on the network.
233    duplicate_cache: DuplicateCache<MessageId>,
234
235    /// A map of peers to their protocol kind. This is to identify different kinds of gossipsub
236    /// peers.
237    peer_protocols: HashMap<PeerId, PeerKind>,
238
239    /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
240    topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
241
242    /// A map of all connected peers to their subscribed topics.
243    peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
244
245    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
246    /// forward messages to, outside of the scoring system.
247    explicit_peers: HashSet<PeerId>,
248
249    /// A list of peers that have been blacklisted by the user.
250    /// Messages are not sent to and are rejected from these peers.
251    /// Connection will not be kept for them as well.
252    /// Ban has a time limit. We don't want because of the accident to ban anybody forever.
253    blacklisted_peers: HashMap<PeerId, Instant>,
254
255    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
256    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
257
258    /// Map of topics to list of peers that we publish to, but don't subscribe to.
259    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
260
261    /// The last publish time for fanout topics.
262    fanout_last_pub: HashMap<TopicHash, Instant>,
263
264    ///Storage for backoffs
265    backoffs: BackoffStorage,
266
267    /// Message cache for the last few heartbeats.
268    mcache: MessageCache,
269
270    /// Heartbeat interval stream.
271    heartbeat: Interval,
272
273    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
274    /// clean up -- eg backoff clean up.
275    heartbeat_ticks: u64,
276
277    /// We remember all peers we found through peer exchange, since those peers are not considered
278    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
279    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
280    /// be removed from this list which may result in a true outbound rediscovery.
281    px_peers: HashSet<PeerId>,
282
283    /// Set of connected outbound peers (we only consider true outbound peers found through
284    /// discovery and not by PX).
285    outbound_peers: HashSet<PeerId>,
286
287    /// Stores optional peer score data together with thresholds, decay interval and gossip
288    /// promises.
289    peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
290
291    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
292    count_received_ihave: HashMap<PeerId, usize>,
293
294    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
295    count_sent_iwant: HashMap<PeerId, usize>,
296
297    /// Short term cache for published messsage ids. This is used for penalizing peers sending
298    /// our own messages back if the messages are anonymous or use a random author.
299    published_message_ids: DuplicateCache<MessageId>,
300
301    /// Short term cache for fast message ids mapping them to the real message ids
302    fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
303
304    /// The filter used to handle message subscriptions.
305    subscription_filter: F,
306
307    /// A general transformation function that can be applied to data received from the wire before
308    /// calculating the message-id and sending to the application. This is designed to allow the
309    /// user to implement arbitrary topic-based compression algorithms.
310    data_transform: D,
311}
312
313impl<D, F> Gossipsub<D, F>
314where
315    D: DataTransform + Default,
316    F: TopicSubscriptionFilter + Default,
317{
318    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
319    /// [`GossipsubConfig`]. This has no subscription filter and uses no compression.
320    pub fn new(
321        privacy: MessageAuthenticity,
322        config: GossipsubConfig,
323    ) -> Result<Self, &'static str> {
324        Self::new_with_subscription_filter_and_transform(
325            privacy,
326            config,
327            F::default(),
328            D::default(),
329        )
330    }
331}
332
333impl<D, F> Gossipsub<D, F>
334where
335    D: DataTransform + Default,
336    F: TopicSubscriptionFilter,
337{
338    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
339    /// [`GossipsubConfig`] and a custom subscription filter.
340    pub fn new_with_subscription_filter(
341        privacy: MessageAuthenticity,
342        config: GossipsubConfig,
343        subscription_filter: F,
344    ) -> Result<Self, &'static str> {
345        Self::new_with_subscription_filter_and_transform(
346            privacy,
347            config,
348            subscription_filter,
349            D::default(),
350        )
351    }
352}
353
354impl<D, F> Gossipsub<D, F>
355where
356    D: DataTransform,
357    F: TopicSubscriptionFilter + Default,
358{
359    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
360    /// [`GossipsubConfig`] and a custom data transform.
361    pub fn new_with_transform(
362        privacy: MessageAuthenticity,
363        config: GossipsubConfig,
364        data_transform: D,
365    ) -> Result<Self, &'static str> {
366        Self::new_with_subscription_filter_and_transform(
367            privacy,
368            config,
369            F::default(),
370            data_transform,
371        )
372    }
373}
374
375impl<D, F> Gossipsub<D, F>
376where
377    D: DataTransform,
378    F: TopicSubscriptionFilter,
379{
380    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
381    /// [`GossipsubConfig`] and a custom subscription filter and data transform.
382    pub fn new_with_subscription_filter_and_transform(
383        privacy: MessageAuthenticity,
384        config: GossipsubConfig,
385        subscription_filter: F,
386        data_transform: D,
387    ) -> Result<Self, &'static str> {
388        // Set up the router given the configuration settings.
389
390        // We do not allow configurations where a published message would also be rejected if it
391        // were received locally.
392        validate_config(&privacy, &config.validation_mode())?;
393
394        // Set up message publishing parameters.
395
396        Ok(Gossipsub {
397            events: VecDeque::new(),
398            control_pool: HashMap::new(),
399            publish_config: privacy.into(),
400            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
401            fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
402            topic_peers: HashMap::new(),
403            peer_topics: HashMap::new(),
404            explicit_peers: HashSet::new(),
405            blacklisted_peers: HashMap::new(),
406            mesh: HashMap::new(),
407            fanout: HashMap::new(),
408            fanout_last_pub: HashMap::new(),
409            backoffs: BackoffStorage::new(
410                &config.prune_backoff(),
411                config.heartbeat_interval(),
412                config.backoff_slack(),
413            ),
414            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
415            heartbeat: Interval::new_at(
416                Instant::now() + config.heartbeat_initial_delay(),
417                config.heartbeat_interval(),
418            ),
419            heartbeat_ticks: 0,
420            px_peers: HashSet::new(),
421            outbound_peers: HashSet::new(),
422            peer_score: None,
423            count_received_ihave: HashMap::new(),
424            count_sent_iwant: HashMap::new(),
425            peer_protocols: HashMap::new(),
426            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
427            config,
428            subscription_filter,
429            data_transform,
430        })
431    }
432}
433
434impl<D, F> Gossipsub<D, F>
435where
436    D: DataTransform,
437    F: TopicSubscriptionFilter,
438{
439    /// Lists the hashes of the topics we are currently subscribed to.
440    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
441        self.mesh.keys()
442    }
443
444    /// Lists all mesh peers for a certain topic hash.
445    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
446        self.mesh
447            .get(topic_hash)
448            .into_iter()
449            .map(|x| x.iter())
450            .flatten()
451    }
452
453    /// Lists all mesh peers for all topics.
454    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
455        let mut res = BTreeSet::new();
456        for peers in self.mesh.values() {
457            res.extend(peers);
458        }
459        res.into_iter()
460    }
461
462    /// Lists all known peers and their associated subscribed topics.
463    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
464        self.peer_topics
465            .iter()
466            .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
467    }
468
469    /// Lists all known peers and their associated protocol.
470    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
471        self.peer_protocols.iter()
472    }
473
474    /// Returns the gossipsub score for a given peer, if one exists.
475    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
476        self.peer_score
477            .as_ref()
478            .map(|(score, ..)| score.score(peer_id))
479    }
480
481    /// Subscribe to a topic.
482    ///
483    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
484    /// subscribed.
485    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
486        debug!("Subscribing to topic: {}", topic);
487        let topic_hash = topic.hash();
488        if !self.subscription_filter.can_subscribe(&topic_hash) {
489            return Err(SubscriptionError::NotAllowed);
490        }
491
492        if self.mesh.get(&topic_hash).is_some() {
493            debug!("Topic: {} is already in the mesh.", topic);
494            return Ok(false);
495        }
496
497        // send subscription request to all peers
498        let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
499        if !peer_list.is_empty() {
500            let event = Arc::new(
501                GossipsubRpc {
502                    messages: Vec::new(),
503                    subscriptions: vec![GossipsubSubscription {
504                        topic_hash: topic_hash.clone(),
505                        action: GossipsubSubscriptionAction::Subscribe,
506                    }],
507                    control_msgs: Vec::new(),
508                }
509                .into_protobuf(),
510            );
511
512            for peer in peer_list {
513                debug!("Sending SUBSCRIBE to peer: {:?}", peer);
514                self.send_message(peer, event.clone())
515                    .map_err(SubscriptionError::PublishError)?;
516            }
517        }
518
519        // call JOIN(topic)
520        // this will add new peers to the mesh for the topic
521        self.join(&topic_hash);
522        info!("Subscribed to topic: {}", topic);
523        Ok(true)
524    }
525
526    /// Unsubscribes from a topic.
527    ///
528    /// Returns [`Ok(true)`] if we were subscribed to this topic.
529    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
530        debug!("Unsubscribing from topic: {}", topic);
531        let topic_hash = topic.hash();
532
533        if self.mesh.get(&topic_hash).is_none() {
534            debug!("Already unsubscribed from topic: {:?}", topic_hash);
535            // we are not subscribed
536            return Ok(false);
537        }
538
539        // announce to all peers
540        let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
541        if !peer_list.is_empty() {
542            let event = Arc::new(
543                GossipsubRpc {
544                    messages: Vec::new(),
545                    subscriptions: vec![GossipsubSubscription {
546                        topic_hash: topic_hash.clone(),
547                        action: GossipsubSubscriptionAction::Unsubscribe,
548                    }],
549                    control_msgs: Vec::new(),
550                }
551                .into_protobuf(),
552            );
553
554            for peer in peer_list {
555                debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
556                self.send_message(peer, event.clone())?;
557            }
558        }
559
560        // call LEAVE(topic)
561        // this will remove the topic from the mesh
562        self.leave(&topic_hash);
563
564        info!("Unsubscribed from topic: {:?}", topic_hash);
565        Ok(true)
566    }
567
568    /// Publishes a message with multiple topics to the network.
569    pub fn publish<H: Hasher>(
570        &mut self,
571        topic: Topic<H>,
572        data: impl Into<Vec<u8>>,
573    ) -> Result<MessageId, PublishError> {
574        let data = data.into();
575
576        // Transform the data before building a raw_message.
577        let transformed_data = self
578            .data_transform
579            .outbound_transform(&topic.hash(), data.clone())?;
580
581        let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
582
583        // calculate the message id from the un-transformed data
584        let msg_id = self.config.message_id(&GossipsubMessage {
585            source: raw_message.source,
586            data: data.clone(), // the uncompressed form
587            sequence_number: raw_message.sequence_number,
588            topic: raw_message.topic.clone(),
589        });
590
591        let event = Arc::new(
592            GossipsubRpc {
593                subscriptions: Vec::new(),
594                messages: vec![raw_message.clone()],
595                control_msgs: Vec::new(),
596            }
597            .into_protobuf(),
598        );
599
600        // check that the size doesn't exceed the max transmission size
601        if event.encoded_len() > self.config.max_transmit_size() {
602            return Err(PublishError::MessageTooLarge);
603        }
604
605        // Check the if the message has been published before
606        if self.duplicate_cache.contains(&msg_id) {
607            // This message has already been seen. We don't re-publish messages that have already
608            // been published on the network.
609            warn!(
610                "Not publishing a message that has already been published. Msg-id {}",
611                msg_id
612            );
613            return Err(PublishError::Duplicate);
614        }
615
616        debug!("Publishing message: Message_id: {:?}  Content: {}", msg_id, String::from_utf8_lossy(&data) );
617
618        let topic_hash = raw_message.topic.clone();
619
620        // If we are not flood publishing forward the message to mesh peers.
621        let mesh_peers_sent =
622            !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
623
624        let mut recipient_peers = HashSet::new();
625        if let Some(set) = self.topic_peers.get(&topic_hash) {
626            if self.config.flood_publish() {
627                // Forward to all peers above score and all explicit peers
628                recipient_peers.extend(
629                    set.iter()
630                        .filter(|p| {
631                            self.explicit_peers.contains(*p)
632                                || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
633                        })
634                        .cloned(),
635                );
636            } else {
637                // Explicit peers
638                for peer in &self.explicit_peers {
639                    if set.contains(peer) {
640                        recipient_peers.insert(*peer);
641                    }
642                }
643
644                // Floodsub peers
645                for (peer, kind) in &self.peer_protocols {
646                    if kind == &PeerKind::Floodsub
647                        && !self
648                            .score_below_threshold(peer, |ts| ts.publish_threshold)
649                            .0
650                    {
651                        recipient_peers.insert(*peer);
652                    }
653                }
654
655                // Gossipsub peers
656                if self.mesh.get(&topic_hash).is_none() {
657                    debug!("Topic: {:?} not in the mesh", topic_hash);
658                    // If we have fanout peers add them to the map.
659                    if self.fanout.contains_key(&topic_hash) {
660                        for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
661                            recipient_peers.insert(*peer);
662                        }
663                    } else {
664                        // We have no fanout peers, select mesh_n of them and add them to the fanout
665                        let mesh_n = self.config.mesh_n();
666                        let new_peers = get_random_peers(
667                            &self.topic_peers,
668                            &self.peer_protocols,
669                            &topic_hash,
670                            mesh_n,
671                            {
672                                |p| {
673                                    !self.explicit_peers.contains(p)
674                                        && !self
675                                            .score_below_threshold(p, |pst| pst.publish_threshold)
676                                            .0
677                                }
678                            },
679                        );
680                        // Add the new peers to the fanout and recipient peers
681                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
682                        for peer in new_peers {
683                            debug!("Peer added to fanout: {:?}", peer);
684                            recipient_peers.insert(peer);
685                        }
686                    }
687                    // We are publishing to fanout peers - update the time we published
688                    self.fanout_last_pub
689                        .insert(topic_hash.clone(), Instant::now());
690                }
691            }
692        }
693
694        // if there are no peers to send the message, we will send the message to mesh_n random peers
695        if recipient_peers.is_empty() {
696            let mut rng = thread_rng();
697            recipient_peers.extend(
698                self.peer_topics.iter().map(|(k,_v)| k.clone())
699                    .choose_multiple(&mut rng, self.config.mesh_n() )
700                    .iter().cloned()
701            );
702        }
703
704        if recipient_peers.is_empty() && !mesh_peers_sent {
705            return Err(PublishError::InsufficientPeers);
706        }
707
708        // If the message isn't a duplicate and we have sent it to some peers add it to the
709        // duplicate cache and memcache.
710        self.duplicate_cache.insert(msg_id.clone());
711        self.mcache.put(&msg_id, raw_message);
712
713        // If the message is anonymous or has a random author add it to the published message ids
714        // cache.
715        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
716            if !self.config.allow_self_origin() {
717                self.published_message_ids.insert(msg_id.clone());
718            }
719        }
720
721        // Send to peers we know are subscribed to the topic.
722        for peer_id in recipient_peers.iter() {
723            debug!("Sending message to peer: {:?}", peer_id);
724            self.send_message(*peer_id, event.clone())?;
725        }
726
727        info!("Published message: {:?}", &msg_id);
728        Ok(msg_id)
729    }
730
731    /// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after
732    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
733    /// validation is expected to be fast enough that the messages should still exist in the cache.
734    /// There are three possible validation outcomes and the outcome is given in acceptance.
735    ///
736    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
737    /// network. The `propagation_source` parameter indicates who the message was received by and
738    /// will not be forwarded back to that peer.
739    ///
740    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
741    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
742    //
743    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
744    /// but no Pâ‚„ penalty will be applied.
745    ///
746    /// This function will return true if the message was found in the cache and false if was not
747    /// in the cache anymore.
748    ///
749    /// This should only be called once per message.
750    pub fn report_message_validation_result(
751        &mut self,
752        msg_id: &MessageId,
753        propagation_source: &PeerId,
754        acceptance: MessageAcceptance,
755    ) -> Result<bool, PublishError> {
756        let reject_reason = match acceptance {
757            MessageAcceptance::Accept => {
758                let raw_message = match self.mcache.validate(msg_id) {
759                    Some(raw_message) => raw_message.clone(),
760                    None => {
761                        warn!(
762                            "Message not in cache. Ignoring forwarding. Message Id: {}",
763                            msg_id
764                        );
765                        return Ok(false);
766                    }
767                };
768                self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
769                return Ok(true);
770            }
771            MessageAcceptance::Reject => {
772                self.disconnect_peer(propagation_source.clone(), true );
773                RejectReason::ValidationFailed
774            },
775            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
776        };
777
778        if let Some(raw_message) = self.mcache.remove(msg_id) {
779            // Tell peer_score about reject
780            if let Some((peer_score, ..)) = &mut self.peer_score {
781                peer_score.reject_message(
782                    propagation_source,
783                    msg_id,
784                    &raw_message.topic,
785                    reject_reason,
786                );
787            }
788            Ok(true)
789        } else {
790            warn!("Rejected message not in cache. Message Id: {}", msg_id);
791            Ok(false)
792        }
793    }
794
795    /// Adds a new peer to the list of explicitly connected peers.
796    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
797        debug!("Adding explicit peer {}", peer_id);
798
799        self.explicit_peers.insert(*peer_id);
800
801        self.check_explicit_peer_connection(peer_id);
802    }
803
804    /// This removes the peer from explicitly connected peers, note that this does not disconnect
805    /// the peer.
806    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
807        debug!("Removing explicit peer {}", peer_id);
808        self.explicit_peers.remove(peer_id);
809    }
810
811    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
812    /// created by this peer will be rejected.
813    pub fn blacklist_peer(&mut self, peer_id: &PeerId, duration: Duration ) {
814        if self.blacklisted_peers.insert(*peer_id, Instant::now().add(duration) ).is_none() {
815            debug!("Peer has been blacklisted: {}", peer_id);
816        }
817    }
818
819    /// Removes a peer from the blacklist if it has previously been blacklisted.
820    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
821        if self.blacklisted_peers.remove(peer_id).is_some() {
822            debug!("Peer has been removed from the blacklist: {}", peer_id);
823        }
824    }
825
826    /// Activates the peer scoring system with the given parameters. This will reset all scores
827    /// if there was already another peer scoring system activated. Returns an error if the
828    /// params are not valid or if they got already set.
829    pub fn with_peer_score(
830        &mut self,
831        params: PeerScoreParams,
832        threshold: PeerScoreThresholds,
833    ) -> Result<(), String> {
834        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
835    }
836
837    /// Activates the peer scoring system with the given parameters and a message delivery time
838    /// callback. Returns an error if the parameters got already set.
839    pub fn with_peer_score_and_message_delivery_time_callback(
840        &mut self,
841        params: PeerScoreParams,
842        threshold: PeerScoreThresholds,
843        callback: Option<fn(&PeerId, &TopicHash, f64)>,
844    ) -> Result<(), String> {
845        params.validate()?;
846        threshold.validate()?;
847
848        if self.peer_score.is_some() {
849            return Err("Peer score set twice".into());
850        }
851
852        let interval = Interval::new(params.decay_interval);
853        let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
854        self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
855        Ok(())
856    }
857
858    /// Sets scoring parameters for a topic.
859    ///
860    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
861    pub fn set_topic_params<H: Hasher>(
862        &mut self,
863        topic: Topic<H>,
864        params: TopicScoreParams,
865    ) -> Result<(), &'static str> {
866        if let Some((peer_score, ..)) = &mut self.peer_score {
867            peer_score.set_topic_params(topic.hash(), params);
868            Ok(())
869        } else {
870            Err("Peer score must be initialised with `with_peer_score()`")
871        }
872    }
873
874    /// Sets the application specific score for a peer. Returns true if scoring is active and
875    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
876    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
877        if let Some((peer_score, ..)) = &mut self.peer_score {
878            peer_score.set_application_score(peer_id, new_score)
879        } else {
880            false
881        }
882    }
883
884    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
885    fn join(&mut self, topic_hash: &TopicHash) {
886        debug!("Running JOIN for topic: {:?}", topic_hash);
887
888        // if we are already in the mesh, return
889        if self.mesh.contains_key(topic_hash) {
890            info!("JOIN: The topic is already in the mesh, ignoring JOIN");
891            return;
892        }
893
894        let mut added_peers = HashSet::new();
895
896        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
897        // removing the fanout entry.
898        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
899            debug!(
900                "JOIN: Removing peers from the fanout for topic: {:?}",
901                topic_hash
902            );
903
904            // remove explicit peers, peers with negative scores, and backoffed peers
905            peers = peers
906                .into_iter()
907                .filter(|p| {
908                    !self.explicit_peers.contains(p)
909                        && !self.score_below_threshold(p, |_| 0.0).0
910                        && !self.backoffs.is_backoff_with_slack(topic_hash, p)
911                })
912                .collect();
913
914            // Add up to mesh_n of them them to the mesh
915            // NOTE: These aren't randomly added, currently FIFO
916            let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
917            debug!(
918                "JOIN: Adding {:?} peers from the fanout for topic: {:?}",
919                add_peers, topic_hash
920            );
921            added_peers.extend(peers.iter().cloned().take(add_peers));
922            self.mesh.insert(
923                topic_hash.clone(),
924                peers.into_iter().take(add_peers).collect(),
925            );
926            // remove the last published time
927            self.fanout_last_pub.remove(topic_hash);
928        }
929
930        // check if we need to get more peers, which we randomly select
931        if added_peers.len() < self.config.mesh_n() {
932            // get the peers
933            let new_peers = get_random_peers(
934                &self.topic_peers,
935                &self.peer_protocols,
936                topic_hash,
937                self.config.mesh_n() - added_peers.len(),
938                |peer| {
939                    !added_peers.contains(peer)
940                        && !self.explicit_peers.contains(peer)
941                        && !self.score_below_threshold(peer, |_| 0.0).0
942                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
943                },
944            );
945            added_peers.extend(new_peers.clone());
946            // add them to the mesh
947            debug!(
948                "JOIN: Inserting {:?} random peers into the mesh",
949                new_peers.len()
950            );
951            let mesh_peers = self
952                .mesh
953                .entry(topic_hash.clone())
954                .or_insert_with(Default::default);
955            mesh_peers.extend(new_peers);
956        }
957
958        for peer_id in added_peers {
959            // Send a GRAFT control message
960            info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
961            if let Some((peer_score, ..)) = &mut self.peer_score {
962                peer_score.graft(&peer_id, topic_hash.clone());
963            }
964            Self::control_pool_add(
965                &mut self.control_pool,
966                peer_id,
967                GossipsubControlAction::Graft {
968                    topic_hash: topic_hash.clone(),
969                },
970            );
971        }
972        debug!("Completed JOIN for topic: {:?}", topic_hash);
973    }
974
975    /// Creates a PRUNE gossipsub action.
976    fn make_prune(
977        &mut self,
978        topic_hash: &TopicHash,
979        peer: &PeerId,
980        do_px: bool,
981    ) -> GossipsubControlAction {
982        if let Some((peer_score, ..)) = &mut self.peer_score {
983            peer_score.prune(peer, topic_hash.clone());
984        }
985
986        match self.peer_protocols.get(peer) {
987            Some(PeerKind::Floodsub) => {
988                error!("Attempted to prune a Floodsub peer");
989            }
990            Some(PeerKind::Gossipsub) => {
991                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
992                return GossipsubControlAction::Prune {
993                    topic_hash: topic_hash.clone(),
994                    peers: Vec::new(),
995                    backoff: None,
996                };
997            }
998            None => {
999                error!("Attempted to Prune an unknown peer");
1000            }
1001            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1002        }
1003
1004        // Select peers for peer exchange
1005        let peers = if do_px {
1006            get_random_peers(
1007                &self.topic_peers,
1008                &self.peer_protocols,
1009                &topic_hash,
1010                self.config.prune_peers(),
1011                |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1012            )
1013            .into_iter()
1014            .map(|p| PeerInfo { peer_id: Some(p) })
1015            .collect()
1016        } else {
1017            Vec::new()
1018        };
1019
1020        // update backoff
1021        self.backoffs
1022            .update_backoff(topic_hash, peer, self.config.prune_backoff());
1023
1024        GossipsubControlAction::Prune {
1025            topic_hash: topic_hash.clone(),
1026            peers,
1027            backoff: Some(self.config.prune_backoff().as_secs()),
1028        }
1029    }
1030
1031    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1032    fn leave(&mut self, topic_hash: &TopicHash) {
1033        debug!("Running LEAVE for topic {:?}", topic_hash);
1034
1035        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1036        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1037            for peer in peers {
1038                // Send a PRUNE control message
1039                info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
1040                let control = self.make_prune(topic_hash, &peer, self.config.do_px());
1041                Self::control_pool_add(&mut self.control_pool, peer, control);
1042            }
1043        }
1044        debug!("Completed LEAVE for topic: {:?}", topic_hash);
1045    }
1046
1047    /// Checks if the given peer is still connected and if not dials the peer again.
1048    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1049        if !self.peer_topics.contains_key(peer_id) {
1050            // Connect to peer
1051            debug!("Connecting to explicit peer {:?}", peer_id);
1052            self.events.push_back(NetworkBehaviourAction::DialPeer {
1053                peer_id: *peer_id,
1054                condition: DialPeerCondition::Disconnected,
1055            });
1056        }
1057    }
1058
1059    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
1060    /// `threshold` parameter.
1061    fn score_below_threshold(
1062        &self,
1063        peer_id: &PeerId,
1064        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1065    ) -> (bool, f64) {
1066        Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1067    }
1068
1069    fn score_below_threshold_from_scores(
1070        peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
1071        peer_id: &PeerId,
1072        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1073    ) -> (bool, f64) {
1074        if let Some((peer_score, thresholds, ..)) = peer_score {
1075            let score = peer_score.score(peer_id);
1076            if score < threshold(thresholds) {
1077                return (true, score);
1078            }
1079            (false, score)
1080        } else {
1081            (false, 0.0)
1082        }
1083    }
1084
1085    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1086    /// requests it with an IWANT control message.
1087    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1088        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1089        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1090            debug!(
1091                "IHAVE: ignoring peer {:?} with score below threshold [score = {}]",
1092                peer_id, score
1093            );
1094            return;
1095        }
1096
1097        // IHAVE flood protection
1098        let peer_have = self
1099            .count_received_ihave
1100            .entry(*peer_id)
1101            .or_insert(0);
1102        *peer_have += 1;
1103        if *peer_have > self.config.max_ihave_messages() {
1104            debug!(
1105                "IHAVE: peer {} has advertised too many times ({}) within this heartbeat \
1106            interval; ignoring",
1107                peer_id, *peer_have
1108            );
1109            return;
1110        }
1111
1112        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1113            if *iasked >= self.config.max_ihave_length() {
1114                debug!(
1115                    "IHAVE: peer {} has already advertised too many messages ({}); ignoring",
1116                    peer_id, *iasked
1117                );
1118                return;
1119            }
1120        }
1121
1122        debug!("Handling IHAVE for peer: {:?}", peer_id);
1123
1124        // use a hashset to avoid duplicates efficiently
1125        let mut iwant_ids = HashSet::new();
1126
1127        for (topic, ids) in ihave_msgs {
1128            // only process the message if we are subscribed
1129            if !self.mesh.contains_key(&topic) {
1130                debug!(
1131                    "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
1132                    topic
1133                );
1134                continue;
1135            }
1136
1137            for id in ids {
1138                if !self.duplicate_cache.contains(&id) {
1139                    // have not seen this message, request it
1140                    iwant_ids.insert(id);
1141                }
1142            }
1143        }
1144
1145        if !iwant_ids.is_empty() {
1146            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1147            let mut iask = iwant_ids.len();
1148            if *iasked + iask > self.config.max_ihave_length() {
1149                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1150            }
1151
1152            // Send the list of IWANT control messages
1153            debug!(
1154                "IHAVE: Asking for {} out of {} messages from {}",
1155                iask,
1156                iwant_ids.len(),
1157                peer_id
1158            );
1159
1160            //ask in random order
1161            let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
1162            let mut rng = thread_rng();
1163            iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
1164
1165            iwant_ids_vec.truncate(iask as usize);
1166            *iasked += iask;
1167
1168            let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
1169            if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1170                gossip_promises.add_promise(
1171                    *peer_id,
1172                    &message_ids,
1173                    Instant::now() + self.config.iwant_followup_time(),
1174                );
1175            }
1176            debug!(
1177                "IHAVE: Asking for the following messages from {}: {:?}",
1178                peer_id, message_ids
1179            );
1180
1181            Self::control_pool_add(
1182                &mut self.control_pool,
1183                *peer_id,
1184                GossipsubControlAction::IWant { message_ids },
1185            );
1186        }
1187        debug!("Completed IHAVE handling for peer: {:?}", peer_id);
1188    }
1189
1190    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1191    /// forwarded to the requesting peer.
1192    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1193        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1194        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1195            debug!(
1196                "IWANT: ignoring peer {:?} with score below threshold [score = {}]",
1197                peer_id, score
1198            );
1199            return;
1200        }
1201
1202        debug!("Handling IWANT for peer: {:?}", peer_id);
1203        // build a hashmap of available messages
1204        let mut cached_messages = HashMap::new();
1205
1206        for id in iwant_msgs {
1207            // If we have it and the IHAVE count is not above the threshold, add it do the
1208            // cached_messages mapping
1209            if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
1210                if count > self.config.gossip_retransimission() {
1211                    debug!(
1212                        "IWANT: Peer {} has asked for message {} too many times; ignoring \
1213                    request",
1214                        peer_id, &id
1215                    );
1216                } else {
1217                    cached_messages.insert(id.clone(), msg.clone());
1218                }
1219            }
1220        }
1221
1222        if !cached_messages.is_empty() {
1223            debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
1224            // Send the messages to the peer
1225            let message_list = cached_messages
1226                .into_iter()
1227                .map(|entry| entry.1)
1228                .collect();
1229            if self
1230                .send_message(
1231                    *peer_id,
1232                    GossipsubRpc {
1233                        subscriptions: Vec::new(),
1234                        messages: message_list,
1235                        control_msgs: Vec::new(),
1236                    }
1237                    .into_protobuf(),
1238                )
1239                .is_err()
1240            {
1241                error!("Failed to send cached messages. Messages too large");
1242            }
1243        }
1244        debug!("Completed IWANT handling for peer: {}", peer_id);
1245    }
1246
1247    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1248    /// responds with PRUNE messages.
1249    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1250        debug!("Handling GRAFT message for peer: {}", peer_id);
1251
1252        let mut to_prune_topics = HashSet::new();
1253
1254        let mut do_px = self.config.do_px();
1255
1256        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1257        if self.explicit_peers.contains(peer_id) {
1258            warn!("GRAFT: ignoring request from direct peer {}", peer_id);
1259            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1260            to_prune_topics = HashSet::from_iter(topics.into_iter());
1261            // but don't PX
1262            do_px = false
1263        } else {
1264            let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1265            let now = Instant::now();
1266            for topic_hash in topics {
1267                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1268                    // if the peer is already in the mesh ignore the graft
1269                    if peers.contains(peer_id) {
1270                        debug!(
1271                            "GRAFT: Received graft for peer {:?} that is already in topic {:?}",
1272                            peer_id, &topic_hash
1273                        );
1274                        continue;
1275                    }
1276
1277                    // make sure we are not backing off that peer
1278                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1279                    {
1280                        if backoff_time > now {
1281                            warn!(
1282                                "GRAFT: peer attempted graft within backoff time, penalizing {}",
1283                                peer_id
1284                            );
1285                            // add behavioural penalty
1286                            if let Some((peer_score, ..)) = &mut self.peer_score {
1287                                peer_score.add_penalty(peer_id, 1);
1288
1289                                // check the flood cutoff
1290                                let flood_cutoff = (backoff_time
1291                                    + self.config.graft_flood_threshold())
1292                                    - self.config.prune_backoff();
1293                                if flood_cutoff > now {
1294                                    //extra penalty
1295                                    peer_score.add_penalty(peer_id, 1);
1296                                }
1297                            }
1298                            //no PX
1299                            do_px = false;
1300
1301                            to_prune_topics.insert(topic_hash.clone());
1302                            continue;
1303                        }
1304                    }
1305
1306                    //check the score
1307                    if below_zero {
1308                        // we don't GRAFT peers with negative score
1309                        debug!(
1310                            "GRAFT: ignoring peer {:?} with negative score [score = {}, \
1311                        topic = {}]",
1312                            peer_id, score, topic_hash
1313                        );
1314                        // we do send them PRUNE however, because it's a matter of protocol correctness
1315                        to_prune_topics.insert(topic_hash.clone());
1316                        // but we won't PX to them
1317                        do_px = false;
1318                        continue;
1319                    }
1320
1321                    // check mesh upper bound and only allow graft if the upper bound is not reached or
1322                    // if it is an outbound peer
1323                    if peers.len() >= self.config.mesh_n_high()
1324                        && !self.outbound_peers.contains(peer_id)
1325                    {
1326                        to_prune_topics.insert(topic_hash.clone());
1327                        continue;
1328                    }
1329
1330                    // add peer to the mesh
1331                    info!(
1332                        "GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
1333                        peer_id, &topic_hash
1334                    );
1335                    peers.insert(*peer_id);
1336
1337                    if let Some((peer_score, ..)) = &mut self.peer_score {
1338                        peer_score.graft(peer_id, topic_hash);
1339                    }
1340                } else {
1341                    // don't do PX when there is an unknown topic to avoid leaking our peers
1342                    do_px = false;
1343                    debug!(
1344                        "GRAFT: Received graft for unknown topic {:?} from peer {:?}",
1345                        &topic_hash, peer_id
1346                    );
1347                    // spam hardening: ignore GRAFTs for unknown topics
1348                    continue;
1349                }
1350            }
1351        }
1352
1353        if !to_prune_topics.is_empty() {
1354            // build the prune messages to send
1355            let prune_messages = to_prune_topics
1356                .iter()
1357                .map(|t| self.make_prune(t, peer_id, do_px))
1358                .collect();
1359            // Send the prune messages to the peer
1360            info!(
1361                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer: {}",
1362                peer_id
1363            );
1364
1365            if self
1366                .send_message(
1367                    *peer_id,
1368                    GossipsubRpc {
1369                        subscriptions: Vec::new(),
1370                        messages: Vec::new(),
1371                        control_msgs: prune_messages,
1372                    }
1373                    .into_protobuf(),
1374                )
1375                .is_err()
1376            {
1377                error!("Failed to send graft. Message too large");
1378            }
1379        }
1380        debug!("Completed GRAFT handling for peer: {}", peer_id);
1381    }
1382
1383    fn remove_peer_from_mesh(
1384        &mut self,
1385        peer_id: &PeerId,
1386        topic_hash: &TopicHash,
1387        backoff: Option<u64>,
1388        always_update_backoff: bool,
1389    ) {
1390        let mut update_backoff = always_update_backoff;
1391        if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1392            // remove the peer if it exists in the mesh
1393            if peers.remove(peer_id) {
1394                info!(
1395                    "PRUNE: Removing peer: {} from the mesh for topic: {}",
1396                    peer_id.to_string(),
1397                    topic_hash
1398                );
1399
1400                if let Some((peer_score, ..)) = &mut self.peer_score {
1401                    peer_score.prune(peer_id, topic_hash.clone());
1402                }
1403
1404                update_backoff = true;
1405            }
1406        }
1407        if update_backoff {
1408            let time = if let Some(backoff) = backoff {
1409                Duration::from_secs(backoff)
1410            } else {
1411                self.config.prune_backoff()
1412            };
1413            // is there a backoff specified by the peer? if so obey it.
1414            self.backoffs.update_backoff(&topic_hash, peer_id, time);
1415        }
1416    }
1417
1418    /// Handles PRUNE control messages. Removes peer from the mesh.
1419    fn handle_prune(
1420        &mut self,
1421        peer_id: &PeerId,
1422        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1423    ) {
1424        debug!("Handling PRUNE message for peer: {}", peer_id);
1425        let (below_threshold, score) =
1426            self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1427        for (topic_hash, px, backoff) in prune_data {
1428            self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
1429
1430            if self.mesh.contains_key(&topic_hash) {
1431                //connect to px peers
1432                if !px.is_empty() {
1433                    // we ignore PX from peers with insufficient score
1434                    if below_threshold {
1435                        debug!(
1436                            "PRUNE: ignoring PX from peer {:?} with insufficient score \
1437                             [score ={} topic = {}]",
1438                            peer_id, score, topic_hash
1439                        );
1440                        continue;
1441                    }
1442
1443                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1444                    // know their multiaddr. Until SignedRecords are spec'd this
1445                    // remains a stub. By default `config.prune_peers()` is set to zero and
1446                    // this is skipped. If the user modifies this, this will only be able to
1447                    // dial already known peers (from an external discovery mechanism for
1448                    // example).
1449                    if self.config.prune_peers() > 0 {
1450                        self.px_connect(px);
1451                    }
1452                }
1453            }
1454        }
1455        debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
1456    }
1457
1458    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1459        let n = self.config.prune_peers();
1460        // Ignore peerInfo with no ID
1461        //
1462        //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1463        // signed peer record?
1464        px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
1465        if px.len() > n {
1466            // only use at most prune_peers many random peers
1467            let mut rng = thread_rng();
1468            px.partial_shuffle(&mut rng, n);
1469            px = px.into_iter().take(n).collect();
1470        }
1471
1472        for p in px {
1473            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1474            // it, see https://github.com/libp2p/specs/pull/217
1475            if let Some(peer_id) = p.peer_id {
1476                // mark as px peer
1477                self.px_peers.insert(peer_id);
1478
1479                // dial peer
1480                self.events.push_back(NetworkBehaviourAction::DialPeer {
1481                    peer_id,
1482                    condition: DialPeerCondition::Disconnected,
1483                });
1484            }
1485        }
1486    }
1487
1488    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1489    /// checks.
1490    fn message_is_valid(
1491        &mut self,
1492        msg_id: &MessageId,
1493        raw_message: &mut RawGossipsubMessage,
1494        propagation_source: &PeerId,
1495    ) -> bool {
1496        debug!(
1497            "Handling message: {:?} from peer: {}",
1498            msg_id,
1499            propagation_source.to_string(),
1500        );
1501
1502        // Reject any message from a blacklisted peer
1503        if self.blacklisted_peers.contains_key(propagation_source) {
1504            debug!(
1505                "Rejecting message from blacklisted peer: {}",
1506                propagation_source
1507            );
1508            if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1509                peer_score.reject_message(
1510                    propagation_source,
1511                    msg_id,
1512                    &raw_message.topic,
1513                    RejectReason::BlackListedPeer,
1514                );
1515                gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1516            }
1517            // Should not happen. The peer connection should be killed the next moment
1518            self.disconnect_peer(*propagation_source, true);
1519            return false;
1520        }
1521
1522        // Also reject any message that originated from a blacklisted peer
1523        if let Some(source) = raw_message.source.as_ref() {
1524            if self.blacklisted_peers.contains_key(source) {
1525                debug!(
1526                    "Rejecting message from peer {} because of blacklisted source: {}",
1527                    propagation_source, source
1528                );
1529                if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1530                    peer_score.reject_message(
1531                        propagation_source,
1532                        msg_id,
1533                        &raw_message.topic,
1534                        RejectReason::BlackListedSource,
1535                    );
1536                    gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
1537                }
1538                return false;
1539            }
1540        }
1541
1542        // If we are not validating messages, assume this message is validated
1543        // This will allow the message to be gossiped without explicitly calling
1544        // `validate_message`.
1545        if !self.config.validate_messages() {
1546            raw_message.validated = true;
1547        }
1548
1549        // reject messages claiming to be from ourselves but not locally published
1550        let self_published = !self.config.allow_self_origin()
1551            && if let Some(own_id) = self.publish_config.get_own_id() {
1552                own_id != propagation_source
1553                    && raw_message.source.as_ref().map_or(false, |s| s == own_id)
1554            } else {
1555                self.published_message_ids.contains(&msg_id)
1556            };
1557
1558        if self_published {
1559            debug!(
1560                "Dropping message {} claiming to be from self but forwarded from {}",
1561                msg_id, propagation_source
1562            );
1563            if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
1564                peer_score.reject_message(
1565                    propagation_source,
1566                    msg_id,
1567                    &raw_message.topic,
1568                    RejectReason::SelfOrigin,
1569                );
1570                gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
1571            }
1572            return false;
1573        }
1574
1575        true
1576    }
1577
1578    /// Handles a newly received [`RawGossipsubMessage`].
1579    ///
1580    /// Forwards the message to all peers in the mesh.
1581    fn handle_received_message(
1582        &mut self,
1583        mut raw_message: RawGossipsubMessage,
1584        propagation_source: &PeerId,
1585    ) {
1586        let fast_message_id = self.config.fast_message_id(&raw_message);
1587        if let Some(fast_message_id) = fast_message_id.as_ref() {
1588            if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
1589                let msg_id = msg_id.clone();
1590                if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1591                    self.handle_invalid_message(
1592                        propagation_source,
1593                        raw_message,
1594                        ValidationError::TransformFailed,
1595                    );
1596                    return;
1597                }
1598                if let Some((peer_score, ..)) = &mut self.peer_score {
1599                    peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
1600                }
1601                return;
1602            }
1603        }
1604
1605        // Try and perform the data transform to the message. If it fails, consider it invalid.
1606        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1607            Ok(message) => message,
1608            Err(e) => {
1609                debug!("Invalid message. Transform error: {:?}", e);
1610                // Reject the message and return
1611                self.handle_invalid_message(
1612                    propagation_source,
1613                    raw_message,
1614                    ValidationError::TransformFailed,
1615                );
1616                return;
1617            }
1618        };
1619
1620        // Calculate the message id on the transformed data.
1621        let msg_id = self.config.message_id(&message);
1622
1623        // Check the validity of the message
1624        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1625        // and instead continually penalize peers that repeatedly send this message.
1626        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1627            return;
1628        }
1629
1630        // Add the message to the duplicate caches
1631        if let Some(fast_message_id) = fast_message_id {
1632            // add id to cache
1633            self.fast_messsage_id_cache
1634                .entry(fast_message_id)
1635                .or_insert_with(|| msg_id.clone());
1636        }
1637        if !self.duplicate_cache.insert(msg_id.clone()) {
1638            debug!(
1639                "Message already received, ignoring. Message: {}",
1640                msg_id
1641            );
1642            if let Some((peer_score, ..)) = &mut self.peer_score {
1643                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1644            }
1645            return;
1646        }
1647        debug!(
1648            "Put message {:?} in duplicate_cache and resolve promises",
1649            msg_id
1650        );
1651
1652        // Tells score that message arrived (but is maybe not fully validated yet).
1653        // Consider the message as delivered for gossip promises.
1654        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1655            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1656            gossip_promises.message_delivered(&msg_id);
1657        }
1658
1659        // Add the message to our memcache
1660        self.mcache.put(&msg_id, raw_message.clone());
1661
1662        // Dispatch the message to the user if we are subscribed to any of the topics
1663        // In validation case
1664        if self.config.validate_messages() || self.mesh.contains_key(&message.topic) {
1665            debug!("Sending received message to user");
1666            self.events.push_back(NetworkBehaviourAction::GenerateEvent(
1667                GossipsubEvent::Message {
1668                    propagation_source: *propagation_source,
1669                    message_id: msg_id.clone(),
1670                    message,
1671                },
1672            ));
1673        } else {
1674            debug!(
1675                "Received message on a topic we are not subscribed to: {:?}",
1676                message.topic
1677            );
1678            return;
1679        }
1680
1681        // forward the message to mesh peers, if no validation is required
1682        if !self.config.validate_messages() {
1683            if self
1684                .forward_msg(&msg_id, raw_message, Some(propagation_source))
1685                .is_err()
1686            {
1687                error!("Failed to forward message. Too large");
1688            }
1689            debug!("Completed message handling for message: {:?}", msg_id);
1690        }
1691    }
1692
1693    // Handles invalid messages received.
1694    fn handle_invalid_message(
1695        &mut self,
1696        propagation_source: &PeerId,
1697        raw_message: RawGossipsubMessage,
1698        validation_error: ValidationError,
1699    ) {
1700        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1701            let reason = RejectReason::ValidationError(validation_error);
1702            let fast_message_id_cache = &self.fast_messsage_id_cache;
1703            if let Some(msg_id) = self
1704                .config
1705                .fast_message_id(&raw_message)
1706                .and_then(|id| fast_message_id_cache.get(&id))
1707            {
1708                peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
1709                gossip_promises.reject_message(msg_id, &reason);
1710            } else {
1711                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1712                // advertising this message via an IHAVE and it's invalid it will be double
1713                // penalized, one for sending us an invalid and again for breaking a promise.
1714                peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1715            }
1716        }
1717    }
1718
1719    /// Handles received subscriptions.
1720    fn handle_received_subscriptions(
1721        &mut self,
1722        subscriptions: &[GossipsubSubscription],
1723        propagation_source: &PeerId,
1724    ) {
1725        debug!(
1726            "Handling subscriptions: {:?}, from source: {}",
1727            subscriptions,
1728            propagation_source.to_string()
1729        );
1730
1731        let mut unsubscribed_peers = Vec::new();
1732
1733        let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
1734            Some(topics) => topics,
1735            None => {
1736                error!(
1737                    "Subscription by unknown peer: {}",
1738                    propagation_source.to_string()
1739                );
1740                return;
1741            }
1742        };
1743
1744        // Collect potential graft messages for the peer.
1745        let mut grafts = Vec::new();
1746
1747        // Notify the application about the subscription, after the grafts are sent.
1748        let mut application_event = Vec::new();
1749
1750        let filtered_topics = match self
1751            .subscription_filter
1752            .filter_incoming_subscriptions(subscriptions, subscribed_topics)
1753        {
1754            Ok(topics) => topics,
1755            Err(s) => {
1756                error!(
1757                    "Subscription filter error: {}; ignoring RPC from peer {}",
1758                    s,
1759                    propagation_source.to_string()
1760                );
1761                return;
1762            }
1763        };
1764
1765        for subscription in filtered_topics {
1766            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1767            let peer_list = self
1768                .topic_peers
1769                .entry(subscription.topic_hash.clone())
1770                .or_insert_with(Default::default);
1771
1772            match subscription.action {
1773                GossipsubSubscriptionAction::Subscribe => {
1774                    if peer_list.insert(*propagation_source) {
1775                        debug!(
1776                            "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
1777                            propagation_source.to_string(),
1778                            subscription.topic_hash
1779                        );
1780                    }
1781
1782                    // add to the peer_topics mapping
1783                    subscribed_topics.insert(subscription.topic_hash.clone());
1784
1785                    // if the mesh needs peers add the peer to the mesh
1786                    if !self.explicit_peers.contains(propagation_source)
1787                        && match self.peer_protocols.get(propagation_source) {
1788                            Some(PeerKind::Gossipsubv1_1) => true,
1789                            Some(PeerKind::Gossipsub) => true,
1790                            _ => false,
1791                        }
1792                        && !Self::score_below_threshold_from_scores(
1793                            &self.peer_score,
1794                            propagation_source,
1795                            |_| 0.0,
1796                        )
1797                        .0
1798                        && !self
1799                            .backoffs
1800                            .is_backoff_with_slack(&subscription.topic_hash, propagation_source)
1801                    {
1802                        if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
1803                            if peers.len() < self.config.mesh_n_low()
1804                                && peers.insert(*propagation_source)
1805                            {
1806                                debug!(
1807                                    "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
1808                                    propagation_source.to_string(),
1809                                    subscription.topic_hash
1810                                );
1811                                // send graft to the peer
1812                                debug!(
1813                                    "Sending GRAFT to peer {} for topic {:?}",
1814                                    propagation_source.to_string(),
1815                                    subscription.topic_hash
1816                                );
1817                                if let Some((peer_score, ..)) = &mut self.peer_score {
1818                                    peer_score
1819                                        .graft(propagation_source, subscription.topic_hash.clone());
1820                                }
1821                                grafts.push(GossipsubControlAction::Graft {
1822                                    topic_hash: subscription.topic_hash.clone(),
1823                                });
1824                            }
1825                        }
1826                    }
1827                    // generates a subscription event to be polled
1828                    application_event.push(NetworkBehaviourAction::GenerateEvent(
1829                        GossipsubEvent::Subscribed {
1830                            peer_id: *propagation_source,
1831                            topic: subscription.topic_hash.clone(),
1832                        },
1833                    ));
1834                }
1835                GossipsubSubscriptionAction::Unsubscribe => {
1836                    if peer_list.remove(propagation_source) {
1837                        info!(
1838                            "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
1839                            propagation_source.to_string(),
1840                            subscription.topic_hash
1841                        );
1842                    }
1843                    // remove topic from the peer_topics mapping
1844                    subscribed_topics.remove(&subscription.topic_hash);
1845                    unsubscribed_peers
1846                        .push((*propagation_source, subscription.topic_hash.clone()));
1847                    // generate an unsubscribe event to be polled
1848                    application_event.push(NetworkBehaviourAction::GenerateEvent(
1849                        GossipsubEvent::Unsubscribed {
1850                            peer_id: *propagation_source,
1851                            topic: subscription.topic_hash.clone(),
1852                        },
1853                    ));
1854                }
1855            }
1856        }
1857
1858        // remove unsubscribed peers from the mesh if it exists
1859        for (peer_id, topic_hash) in unsubscribed_peers {
1860            self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
1861        }
1862
1863        // If we need to send grafts to peer, do so immediately, rather than waiting for the
1864        // heartbeat.
1865        if !grafts.is_empty()
1866            && self
1867                .send_message(
1868                    *propagation_source,
1869                    GossipsubRpc {
1870                        subscriptions: Vec::new(),
1871                        messages: Vec::new(),
1872                        control_msgs: grafts,
1873                    }
1874                    .into_protobuf(),
1875                )
1876                .is_err()
1877        {
1878            error!("Failed sending grafts. Message too large");
1879        }
1880
1881        // Notify the application of the subscriptions
1882        for event in application_event {
1883            self.events.push_back(event);
1884        }
1885
1886        trace!(
1887            "Completed handling subscriptions from source: {:?}",
1888            propagation_source
1889        );
1890    }
1891
1892    /// Applies penalties to peers that did not respond to our IWANT requests.
1893    fn apply_iwant_penalties(&mut self) {
1894        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1895            for (peer, count) in gossip_promises.get_broken_promises() {
1896                peer_score.add_penalty(&peer, count);
1897            }
1898        }
1899    }
1900
1901    /// Heartbeat function which shifts the memcache and updates the mesh.
1902    fn heartbeat(&mut self) {
1903        self.heartbeat_ticks += 1;
1904
1905        let mut to_graft = HashMap::new();
1906        let mut to_prune = HashMap::new();
1907        let mut no_px = HashSet::new();
1908
1909        // clean up expired backoffs
1910        self.backoffs.heartbeat();
1911
1912        // clean up ihave counters
1913        self.count_sent_iwant.clear();
1914        self.count_received_ihave.clear();
1915
1916        // apply iwant penalties
1917        self.apply_iwant_penalties();
1918
1919        // check connections to explicit peers
1920        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
1921            for p in self.explicit_peers.clone() {
1922                self.check_explicit_peer_connection(&p);
1923            }
1924        }
1925
1926        // cache scores throughout the heartbeat
1927        let mut scores = HashMap::new();
1928        let peer_score = &self.peer_score;
1929        let mut score = |p: &PeerId| match peer_score {
1930            Some((peer_score, ..)) => *scores
1931                .entry(*p)
1932                .or_insert_with(|| peer_score.score(p)),
1933            _ => 0.0,
1934        };
1935
1936        // maintain the mesh for each topic
1937        for (topic_hash, peers) in self.mesh.iter_mut() {
1938            let explicit_peers = &self.explicit_peers;
1939            let backoffs = &self.backoffs;
1940            let topic_peers = &self.topic_peers;
1941            let outbound_peers = &self.outbound_peers;
1942
1943            // drop all peers with negative score, without PX
1944            // if there is at some point a stable retain method for BTreeSet the following can be
1945            // written more efficiently with retain.
1946            let to_remove: Vec<_> = peers
1947                .iter()
1948                .filter(|&p| {
1949                    if score(p) < 0.0 {
1950                        debug!(
1951                            "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
1952                             {}]",
1953                            p,
1954                            score(p),
1955                            topic_hash
1956                        );
1957
1958                        let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
1959                        current_topic.push(topic_hash.clone());
1960                        no_px.insert(*p);
1961                        true
1962                    } else {
1963                        false
1964                    }
1965                })
1966                .cloned()
1967                .collect();
1968            for peer in to_remove {
1969                peers.remove(&peer);
1970            }
1971
1972            // too little peers - add some
1973            if peers.len() < self.config.mesh_n_low() {
1974                debug!(
1975                    "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
1976                    topic_hash,
1977                    peers.len(),
1978                    self.config.mesh_n_low()
1979                );
1980                // not enough peers - get mesh_n - current_length more
1981                let desired_peers = self.config.mesh_n() - peers.len();
1982                let peer_list = get_random_peers(
1983                    topic_peers,
1984                    &self.peer_protocols,
1985                    topic_hash,
1986                    desired_peers,
1987                    |peer| {
1988                        !peers.contains(peer)
1989                            && !explicit_peers.contains(peer)
1990                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
1991                            && score(peer) >= 0.0
1992                    },
1993                );
1994                for peer in &peer_list {
1995                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
1996                    current_topic.push(topic_hash.clone());
1997                }
1998                // update the mesh
1999                debug!("Updating mesh, new mesh: {:?}", peer_list);
2000                peers.extend(peer_list);
2001            }
2002
2003            // too many peers - remove some
2004            if peers.len() > self.config.mesh_n_high() {
2005                debug!(
2006                    "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
2007                    topic_hash,
2008                    peers.len(),
2009                    self.config.mesh_n_high()
2010                );
2011                let excess_peer_no = peers.len() - self.config.mesh_n();
2012
2013                // shuffle the peers and then sort by score ascending beginning with the worst
2014                let mut rng = thread_rng();
2015                let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
2016                shuffled.shuffle(&mut rng);
2017                shuffled
2018                    .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
2019                // shuffle everything except the last retain_scores many peers (the best ones)
2020                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2021
2022                // count total number of outbound peers
2023                let mut outbound = {
2024                    let outbound_peers = &self.outbound_peers;
2025                    shuffled
2026                        .iter()
2027                        .filter(|p| outbound_peers.contains(*p))
2028                        .count()
2029                };
2030
2031                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2032                // them to to_prune
2033                let mut removed = 0;
2034                for peer in shuffled {
2035                    if removed == excess_peer_no {
2036                        break;
2037                    }
2038                    if self.outbound_peers.contains(&peer) {
2039                        if outbound <= self.config.mesh_outbound_min() {
2040                            //do not remove anymore outbound peers
2041                            continue;
2042                        } else {
2043                            //an outbound peer gets removed
2044                            outbound -= 1;
2045                        }
2046                    }
2047
2048                    //remove the peer
2049                    peers.remove(&peer);
2050                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2051                    current_topic.push(topic_hash.clone());
2052                    removed += 1;
2053                }
2054            }
2055
2056            // do we have enough outbound peers?
2057            if peers.len() >= self.config.mesh_n_low() {
2058                // count number of outbound peers we have
2059                let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2060
2061                // if we have not enough outbound peers, graft to some new outbound peers
2062                if outbound < self.config.mesh_outbound_min() {
2063                    let needed = self.config.mesh_outbound_min() - outbound;
2064                    let peer_list = get_random_peers(
2065                        topic_peers,
2066                        &self.peer_protocols,
2067                        topic_hash,
2068                        needed,
2069                        |peer| {
2070                            !peers.contains(peer)
2071                                && !explicit_peers.contains(peer)
2072                                && !backoffs.is_backoff_with_slack(topic_hash, peer)
2073                                && score(peer) >= 0.0
2074                                && outbound_peers.contains(peer)
2075                        },
2076                    );
2077                    for peer in &peer_list {
2078                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2079                        current_topic.push(topic_hash.clone());
2080                    }
2081                    // update the mesh
2082                    debug!("Updating mesh, new mesh: {:?}", peer_list);
2083                    peers.extend(peer_list);
2084                }
2085            }
2086
2087            // should we try to improve the mesh with opportunistic grafting?
2088            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2089                && peers.len() > 1
2090                && self.peer_score.is_some()
2091            {
2092                if let Some((_, thresholds, _, _)) = &self.peer_score {
2093                    // Opportunistic grafting works as follows: we check the median score of peers
2094                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2095                    // select a few peers at random with score over the median.
2096                    // The intention is to (slowly) improve an underperforming mesh by introducing
2097                    // good scoring peers that may have been gossiping at us. This allows us to
2098                    // get out of sticky situations where we are stuck with poor peers and also
2099                    // recover from churn of good peers.
2100
2101                    // now compute the median peer score in the mesh
2102                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2103                    peers_by_score
2104                        .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
2105
2106                    let middle = peers_by_score.len() / 2;
2107                    let median = if peers_by_score.len() % 2 == 0 {
2108                        (score(
2109                            *peers_by_score.get(middle - 1).expect(
2110                                "middle < vector length and middle > 0 since peers.len() > 0",
2111                            ),
2112                        ) + score(*peers_by_score.get(middle).expect("middle < vector length")))
2113                            * 0.5
2114                    } else {
2115                        score(*peers_by_score.get(middle).expect("middle < vector length"))
2116                    };
2117
2118                    // if the median score is below the threshold, select a better peer (if any) and
2119                    // GRAFT
2120                    if median < thresholds.opportunistic_graft_threshold {
2121                        let peer_list = get_random_peers(
2122                            topic_peers,
2123                            &self.peer_protocols,
2124                            topic_hash,
2125                            self.config.opportunistic_graft_peers(),
2126                            |peer| {
2127                                !peers.contains(peer)
2128                                    && !explicit_peers.contains(peer)
2129                                    && !backoffs.is_backoff_with_slack(topic_hash, peer)
2130                                    && score(peer) > median
2131                            },
2132                        );
2133                        for peer in &peer_list {
2134                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2135                            current_topic.push(topic_hash.clone());
2136                        }
2137                        // update the mesh
2138                        debug!(
2139                            "Opportunistically graft in topic {} with peers {:?}",
2140                            topic_hash, peer_list
2141                        );
2142                        peers.extend(peer_list);
2143                    }
2144                }
2145            }
2146        }
2147
2148        // remove expired fanout topics
2149        {
2150            let fanout = &mut self.fanout; // help the borrow checker
2151            let fanout_ttl = self.config.fanout_ttl();
2152            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2153                if *last_pub_time + fanout_ttl < Instant::now() {
2154                    debug!(
2155                        "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
2156                        topic_hash
2157                    );
2158                    fanout.remove(&topic_hash);
2159                    return false;
2160                }
2161                true
2162            });
2163        }
2164
2165        // maintain fanout
2166        // check if our peers are still a part of the topic
2167        for (topic_hash, peers) in self.fanout.iter_mut() {
2168            let mut to_remove_peers = Vec::new();
2169            let publish_threshold = match &self.peer_score {
2170                Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2171                _ => 0.0,
2172            };
2173            for peer in peers.iter() {
2174                // is the peer still subscribed to the topic?
2175                match self.peer_topics.get(peer) {
2176                    Some(topics) => {
2177                        if !topics.contains(&topic_hash) || score(peer) < publish_threshold {
2178                            debug!(
2179                                "HEARTBEAT: Peer removed from fanout for topic: {:?}",
2180                                topic_hash
2181                            );
2182                            to_remove_peers.push(*peer);
2183                        }
2184                    }
2185                    None => {
2186                        // remove if the peer has disconnected
2187                        to_remove_peers.push(*peer);
2188                    }
2189                }
2190            }
2191            for to_remove in to_remove_peers {
2192                peers.remove(&to_remove);
2193            }
2194
2195            // not enough peers
2196            if peers.len() < self.config.mesh_n() {
2197                debug!(
2198                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2199                    peers.len(),
2200                    self.config.mesh_n()
2201                );
2202                let needed_peers = self.config.mesh_n() - peers.len();
2203                let explicit_peers = &self.explicit_peers;
2204                let new_peers = get_random_peers(
2205                    &self.topic_peers,
2206                    &self.peer_protocols,
2207                    topic_hash,
2208                    needed_peers,
2209                    |peer| {
2210                        !peers.contains(peer)
2211                            && !explicit_peers.contains(peer)
2212                            && score(peer) < publish_threshold
2213                    },
2214                );
2215                peers.extend(new_peers);
2216            }
2217        }
2218
2219        if self.peer_score.is_some() {
2220            trace!("Peer_scores: {:?}", {
2221                for peer in self.peer_topics.keys() {
2222                    score(peer);
2223                }
2224                scores
2225            });
2226            trace!("Mesh message deliveries: {:?}", {
2227                self.mesh
2228                    .iter()
2229                    .map(|(t, peers)| {
2230                        (
2231                            t.clone(),
2232                            peers
2233                                .iter()
2234                                .map(|p| {
2235                                    (
2236                                        *p,
2237                                        peer_score
2238                                            .as_ref()
2239                                            .expect("peer_score.is_some()")
2240                                            .0
2241                                            .mesh_message_deliveries(p, t)
2242                                            .unwrap_or(0.0),
2243                                    )
2244                                })
2245                                .collect::<HashMap<PeerId, f64>>(),
2246                        )
2247                    })
2248                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2249            })
2250        }
2251
2252        self.emit_gossip();
2253
2254        // send graft/prunes
2255        if !to_graft.is_empty() | !to_prune.is_empty() {
2256            self.send_graft_prune(to_graft, to_prune, no_px);
2257        }
2258
2259        // piggyback pooled control messages
2260        self.flush_control_pool();
2261
2262        // shift the memcache
2263        self.mcache.shift();
2264
2265        // Maintain the expected number of the peers. Occasionally update with available peers.
2266        if self.heartbeat_ticks % self.config.connection_update_ticks() == 0 {
2267            let mut peers = self.peer_topics.keys().cloned()
2268                .collect::<Vec<PeerId>>();
2269
2270            let mut rng = thread_rng();
2271
2272            while peers.len() > self.config.mesh_n_high() * 2 {
2273                let peer_id = peers.remove( rng.next_u32() as usize % peers.len() );
2274                self.disconnect_peer( peer_id, false );
2275            }
2276
2277            // Update one of the peers with connected PeerId info.
2278            if !peers.is_empty() {
2279                if let Err(e) = self.send_peer_list(&peers[rng.next_u32() as usize % peers.len()]) {
2280                    warn!("Failed to update random peer with a list of connected PeerId, {}", e);
2281                }
2282            }
2283        }
2284
2285        // Update time related data. Let's clean up expired...
2286        if self.heartbeat_ticks % 23 == 0 {
2287            let cur_time = Instant::now();
2288            self.blacklisted_peers.retain( |_, v| *v > cur_time );
2289        }
2290
2291        // heartbeat interval is 3 seconds. Let's dump every minute
2292        if self.heartbeat_ticks % 20 == 0 {
2293            // Let's dmp the data for debugging
2294            info!("Libp2p all peers: {:?}", self.peer_topics.iter().map(|(k,_v)| k.to_string()).collect::<Vec<String>>().join(", ") );
2295            info!("Libp2p outbound peers: {}", self.outbound_peers.iter().map(|p| p.to_string() ).collect::<Vec<String>>().join(", ") );
2296            info!("Libp2p blacklisted peers: {}", self.blacklisted_peers.iter().map(|p| p.0.to_string() ).collect::<Vec<String>>().join(", ") );
2297        }
2298    }
2299
2300    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2301    /// and fanout peers
2302    fn emit_gossip(&mut self) {
2303        let mut rng = thread_rng();
2304        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2305            let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash);
2306            if message_ids.is_empty() {
2307                return;
2308            }
2309
2310            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2311            if message_ids.len() > self.config.max_ihave_length() {
2312                // we do the truncation (with shuffling) per peer below
2313                debug!(
2314                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2315                    message_ids.len()
2316                );
2317            } else {
2318                // shuffle to emit in random order
2319                message_ids.shuffle(&mut rng);
2320            }
2321
2322            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2323            let n_map = |m| {
2324                max(
2325                    self.config.gossip_lazy(),
2326                    (self.config.gossip_factor() * m as f64) as usize,
2327                )
2328            };
2329            // get gossip_lazy random peers
2330            let to_msg_peers = get_random_peers_dynamic(
2331                &self.topic_peers,
2332                &self.peer_protocols,
2333                &topic_hash,
2334                n_map,
2335                |peer| {
2336                    !peers.contains(peer)
2337                        && !self.explicit_peers.contains(peer)
2338                        && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2339                },
2340            );
2341
2342            debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
2343
2344            for peer in to_msg_peers {
2345                let mut peer_message_ids = message_ids.clone();
2346
2347                if peer_message_ids.len() > self.config.max_ihave_length() {
2348                    // We do this per peer so that we emit a different set for each peer.
2349                    // we have enough redundancy in the system that this will significantly increase
2350                    // the message coverage when we do truncate.
2351                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2352                    peer_message_ids.truncate(self.config.max_ihave_length());
2353                }
2354
2355                // send an IHAVE message
2356                Self::control_pool_add(
2357                    &mut self.control_pool,
2358                    peer,
2359                    GossipsubControlAction::IHave {
2360                        topic_hash: topic_hash.clone(),
2361                        message_ids: peer_message_ids,
2362                    },
2363                );
2364            }
2365        }
2366    }
2367
2368    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2369    /// messages.
2370    fn send_graft_prune(
2371        &mut self,
2372        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2373        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2374        no_px: HashSet<PeerId>,
2375    ) {
2376        // handle the grafts and overlapping prunes per peer
2377        for (peer, topics) in to_graft.iter() {
2378            for topic in topics {
2379                //inform scoring of graft
2380                if let Some((peer_score, ..)) = &mut self.peer_score {
2381                    peer_score.graft(peer, topic.clone());
2382                }
2383            }
2384            let mut control_msgs: Vec<GossipsubControlAction> = topics
2385                .iter()
2386                .map(|topic_hash| GossipsubControlAction::Graft {
2387                    topic_hash: topic_hash.clone(),
2388                })
2389                .collect();
2390
2391            // If there are prunes associated with the same peer add them.
2392            if let Some(topics) = to_prune.remove(peer) {
2393                let mut prunes = topics
2394                    .iter()
2395                    .map(|topic_hash| {
2396                        self.make_prune(
2397                            topic_hash,
2398                            peer,
2399                            self.config.do_px() && !no_px.contains(peer),
2400                        )
2401                    })
2402                    .collect::<Vec<_>>();
2403                control_msgs.append(&mut prunes);
2404            }
2405
2406            // send the control messages
2407            if self
2408                .send_message(
2409                    *peer,
2410                    GossipsubRpc {
2411                        subscriptions: Vec::new(),
2412                        messages: Vec::new(),
2413                        control_msgs,
2414                    }
2415                    .into_protobuf(),
2416                )
2417                .is_err()
2418            {
2419                error!("Failed to send control messages. Message too large");
2420            }
2421        }
2422
2423        // handle the remaining prunes
2424        for (peer, topics) in to_prune.iter() {
2425            let remaining_prunes = topics
2426                .iter()
2427                .map(|topic_hash| {
2428                    self.make_prune(
2429                        topic_hash,
2430                        peer,
2431                        self.config.do_px() && !no_px.contains(peer),
2432                    )
2433                })
2434                .collect();
2435            if self
2436                .send_message(
2437                    *peer,
2438                    GossipsubRpc {
2439                        subscriptions: Vec::new(),
2440                        messages: Vec::new(),
2441                        control_msgs: remaining_prunes,
2442                    }
2443                    .into_protobuf(),
2444                )
2445                .is_err()
2446            {
2447                error!("Failed to send prune messages. Message too large");
2448            }
2449        }
2450    }
2451
2452    /// Helper function which forwards a message to mesh\[topic\] peers.
2453    ///
2454    /// Returns true if at least one peer was messaged.
2455    fn forward_msg(
2456        &mut self,
2457        msg_id: &MessageId,
2458        message: RawGossipsubMessage,
2459        propagation_source: Option<&PeerId>,
2460    ) -> Result<bool, PublishError> {
2461        // message is fully validated inform peer_score
2462        if let Some((peer_score, ..)) = &mut self.peer_score {
2463            if let Some(peer) = propagation_source {
2464                peer_score.deliver_message(peer, msg_id, &message.topic);
2465            }
2466        }
2467
2468        debug!("Forwarding message: {:?}", msg_id);
2469        let mut recipient_peers = HashSet::new();
2470
2471        // add mesh peers
2472        let topic = &message.topic;
2473        // mesh
2474        if let Some(mesh_peers) = self.mesh.get(&topic) {
2475            for peer_id in mesh_peers {
2476                if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
2477                    recipient_peers.insert(*peer_id);
2478                }
2479            }
2480        }
2481
2482        // Add explicit peers
2483        for p in &self.explicit_peers {
2484            if let Some(topics) = self.peer_topics.get(p) {
2485                if Some(p) != propagation_source
2486                    && Some(p) != message.source.as_ref()
2487                    && topics.contains(&message.topic)
2488                {
2489                    recipient_peers.insert(*p);
2490                }
2491            }
2492        }
2493
2494        // forward the message to peers
2495        if !recipient_peers.is_empty() {
2496            let event = Arc::new(
2497                GossipsubRpc {
2498                    subscriptions: Vec::new(),
2499                    messages: vec![message.clone()],
2500                    control_msgs: Vec::new(),
2501                }
2502                .into_protobuf(),
2503            );
2504
2505            for peer in recipient_peers.iter() {
2506                debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
2507                self.send_message(*peer, event.clone())?;
2508            }
2509            debug!("Completed forwarding message");
2510            Ok(true)
2511        } else {
2512            Ok(false)
2513        }
2514    }
2515
2516    /// Constructs a [`RawGossipsubMessage`] performing message signing if required.
2517    pub(crate) fn build_raw_message(
2518        &self,
2519        topic: TopicHash,
2520        data: Vec<u8>,
2521    ) -> Result<RawGossipsubMessage, PublishError> {
2522        match &self.publish_config {
2523            PublishConfig::Signing {
2524                ref keypair,
2525                author,
2526                inline_key,
2527            } => {
2528                // Build and sign the message
2529                let sequence_number: u64 = rand::random();
2530
2531                let signature = {
2532                    let message = rpc_proto::Message {
2533                        from: Some(author.clone().to_bytes()),
2534                        data: Some(data.clone()),
2535                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2536                        topic: topic.clone().into_string(),
2537                        signature: None,
2538                        key: None,
2539                    };
2540
2541                    let mut buf = Vec::with_capacity(message.encoded_len());
2542                    message
2543                        .encode(&mut buf)
2544                        .expect("Buffer has sufficient capacity");
2545
2546                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2547                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2548                    signature_bytes.extend_from_slice(&buf);
2549                    Some(keypair.sign(&signature_bytes)?)
2550                };
2551
2552                Ok(RawGossipsubMessage {
2553                    source: Some(*author),
2554                    data,
2555                    // To be interoperable with the go-implementation this is treated as a 64-bit
2556                    // big-endian uint.
2557                    sequence_number: Some(sequence_number),
2558                    topic,
2559                    signature,
2560                    key: inline_key.clone(),
2561                    validated: true, // all published messages are valid
2562                })
2563            }
2564            PublishConfig::Author(peer_id) => {
2565                Ok(RawGossipsubMessage {
2566                    source: Some(*peer_id),
2567                    data,
2568                    // To be interoperable with the go-implementation this is treated as a 64-bit
2569                    // big-endian uint.
2570                    sequence_number: Some(rand::random()),
2571                    topic,
2572                    signature: None,
2573                    key: None,
2574                    validated: true, // all published messages are valid
2575                })
2576            }
2577            PublishConfig::RandomAuthor => {
2578                Ok(RawGossipsubMessage {
2579                    source: Some(PeerId::random()),
2580                    data,
2581                    // To be interoperable with the go-implementation this is treated as a 64-bit
2582                    // big-endian uint.
2583                    sequence_number: Some(rand::random()),
2584                    topic,
2585                    signature: None,
2586                    key: None,
2587                    validated: true, // all published messages are valid
2588                })
2589            }
2590            PublishConfig::Anonymous => {
2591                Ok(RawGossipsubMessage {
2592                    source: None,
2593                    data,
2594                    // To be interoperable with the go-implementation this is treated as a 64-bit
2595                    // big-endian uint.
2596                    sequence_number: None,
2597                    topic,
2598                    signature: None,
2599                    key: None,
2600                    validated: true, // all published messages are valid
2601                })
2602            }
2603        }
2604    }
2605
2606    pub fn disconnect_peer( &mut self, peer_id: PeerId, ban: bool ) {
2607        if ban {
2608            self.blacklisted_peers.insert(peer_id, Instant::now().add(self.config.ban_peer_duration().clone()) );
2609        }
2610        self.events.push_back(NetworkBehaviourAction::DisconnectPeer {peer_id});
2611    }
2612
2613    // adds a control action to control_pool
2614    fn control_pool_add(
2615        control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
2616        peer: PeerId,
2617        control: GossipsubControlAction,
2618    ) {
2619        control_pool
2620            .entry(peer)
2621            .or_insert_with(Vec::new)
2622            .push(control);
2623    }
2624
2625    /// Takes each control action mapping and turns it into a message
2626    fn flush_control_pool(&mut self) {
2627        for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
2628            if self
2629                .send_message(
2630                    peer,
2631                    GossipsubRpc {
2632                        subscriptions: Vec::new(),
2633                        messages: Vec::new(),
2634                        control_msgs: controls,
2635                    }
2636                    .into_protobuf(),
2637                )
2638                .is_err()
2639            {
2640                error!("Failed to flush control pool. Message too large");
2641            }
2642        }
2643    }
2644
2645    /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it
2646    /// is not already an arc.
2647    fn send_message(
2648        &mut self,
2649        peer_id: PeerId,
2650        message: impl Into<Arc<rpc_proto::Rpc>>,
2651    ) -> Result<(), PublishError> {
2652        // If the message is oversized, try and fragment it. If it cannot be fragmented, log an
2653        // error and drop the message (all individual messages should be small enough to fit in the
2654        // max_transmit_size)
2655
2656        let messages = self.fragment_message(message.into())?;
2657
2658        for message in messages {
2659            self.events
2660                .push_back(NetworkBehaviourAction::NotifyHandler {
2661                    peer_id,
2662                    event: message,
2663                    handler: NotifyHandler::Any,
2664                })
2665        }
2666        Ok(())
2667    }
2668
2669    // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC
2670    // messages to be sent.
2671    fn fragment_message(
2672        &self,
2673        rpc: Arc<rpc_proto::Rpc>,
2674    ) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
2675        if rpc.encoded_len() < self.config.max_transmit_size() {
2676            return Ok(vec![rpc]);
2677        }
2678
2679        let new_rpc = rpc_proto::Rpc {
2680            subscriptions: Vec::new(),
2681            publish: Vec::new(),
2682            control: None,
2683        };
2684
2685        let mut rpc_list = vec![new_rpc.clone()];
2686
2687        // Gets an RPC if the object size will fit, otherwise create a new RPC. The last element
2688        // will be the RPC to add an object.
2689        macro_rules! create_or_add_rpc {
2690            ($object_size: ident ) => {
2691                let list_index = rpc_list.len() - 1; // the list is never empty
2692
2693                // create a new RPC if the new object plus 5% of its size (for length prefix
2694                // buffers) exceeds the max transmit size.
2695                if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
2696                    > self.config.max_transmit_size()
2697                    && rpc_list[list_index] != new_rpc
2698                {
2699                    // create a new rpc and use this as the current
2700                    rpc_list.push(new_rpc.clone());
2701                }
2702            };
2703        }
2704
2705        macro_rules! add_item {
2706            ($object: ident, $type: ident ) => {
2707                let object_size = $object.encoded_len();
2708
2709                if object_size + 2 > self.config.max_transmit_size() {
2710                    // This should not be possible. All received and published messages have already
2711                    // been vetted to fit within the size.
2712                    error!("Individual message too large to fragment");
2713                    return Err(PublishError::MessageTooLarge);
2714                }
2715
2716                create_or_add_rpc!(object_size);
2717                rpc_list
2718                    .last_mut()
2719                    .expect("Must have at least one element")
2720                    .$type
2721                    .push($object.clone());
2722            };
2723        }
2724
2725        // Add messages until the limit
2726        for message in &rpc.publish {
2727            add_item!(message, publish);
2728        }
2729        for subscription in &rpc.subscriptions {
2730            add_item!(subscription, subscriptions);
2731        }
2732
2733        // handle the control messages. If all are within the max_transmit_size, send them without
2734        // fragmenting, otherwise, fragment the control messages
2735        let empty_control = rpc_proto::ControlMessage::default();
2736        if let Some(control) = rpc.control.as_ref() {
2737            if control.encoded_len() + 2 > self.config.max_transmit_size() {
2738                // fragment the RPC
2739                for ihave in &control.ihave {
2740                    let len = ihave.encoded_len();
2741                    create_or_add_rpc!(len);
2742                    rpc_list
2743                        .last_mut()
2744                        .expect("Always an element")
2745                        .control
2746                        .get_or_insert_with(|| empty_control.clone())
2747                        .ihave
2748                        .push(ihave.clone());
2749                }
2750                for iwant in &control.iwant {
2751                    let len = iwant.encoded_len();
2752                    create_or_add_rpc!(len);
2753                    rpc_list
2754                        .last_mut()
2755                        .expect("Always an element")
2756                        .control
2757                        .get_or_insert_with(|| empty_control.clone())
2758                        .iwant
2759                        .push(iwant.clone());
2760                }
2761                for graft in &control.graft {
2762                    let len = graft.encoded_len();
2763                    create_or_add_rpc!(len);
2764                    rpc_list
2765                        .last_mut()
2766                        .expect("Always an element")
2767                        .control
2768                        .get_or_insert_with(|| empty_control.clone())
2769                        .graft
2770                        .push(graft.clone());
2771                }
2772                for prune in &control.prune {
2773                    let len = prune.encoded_len();
2774                    create_or_add_rpc!(len);
2775                    rpc_list
2776                        .last_mut()
2777                        .expect("Always an element")
2778                        .control
2779                        .get_or_insert_with(|| empty_control.clone())
2780                        .prune
2781                        .push(prune.clone());
2782                }
2783            } else {
2784                let len = control.encoded_len();
2785                create_or_add_rpc!(len);
2786                rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
2787            }
2788        }
2789
2790        Ok(rpc_list.into_iter().map(Arc::new).collect())
2791    }
2792
2793    // Sending the list of the peers to the recipient
2794    fn send_peer_list(&mut self, peer_id: &PeerId) -> Result<(), PublishError> {
2795        // Creating the message.
2796        let mut ser = SimplePushSerializer::new(1);
2797
2798        let peers = self.peer_topics.keys().cloned()
2799            .filter(|p| !self.blacklisted_peers.contains_key(p) )
2800            .collect::<Vec<PeerId>>();
2801        let len = std::cmp::min(peers.len(), PEER_EXCHANGE_NUMBER_LIMIT); // Don't share too many peers. 20 should be enough
2802        ser.push_u16(len as u16);
2803
2804        for i in 0..len {
2805            ser.push_vec( &peers[i].to_bytes() );
2806        }
2807
2808        // Here is a data to populate...
2809        let data = ser.to_vec();
2810
2811        let peer_topic : Topic<IdentityHash> = Topic::new(PEER_TOPIC);
2812
2813        // Transform the data before building a raw_message.
2814        let transformed_data = self
2815            .data_transform
2816            .outbound_transform(&peer_topic.hash(), data.clone())?;
2817
2818        let raw_message = self.build_raw_message(peer_topic.into(), transformed_data)?;
2819
2820        let event = Arc::new(
2821            GossipsubRpc {
2822                subscriptions: Vec::new(),
2823                messages: vec![raw_message.clone()],
2824                control_msgs: Vec::new(),
2825            }
2826            .into_protobuf(),
2827        );
2828
2829        self.send_message(peer_id.clone(), event.clone())
2830    }
2831}
2832
2833fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2834    addr.iter().find_map(|p| match p {
2835        Ip4(addr) => Some(IpAddr::V4(addr)),
2836        Ip6(addr) => Some(IpAddr::V6(addr)),
2837        _ => None,
2838    })
2839}
2840
2841impl<C, F> NetworkBehaviour for Gossipsub<C, F>
2842where
2843    C: Send + 'static + DataTransform,
2844    F: Send + 'static + TopicSubscriptionFilter,
2845{
2846    type ProtocolsHandler = GossipsubHandler;
2847    type OutEvent = GossipsubEvent;
2848
2849    fn new_handler(&mut self) -> Self::ProtocolsHandler {
2850        GossipsubHandler::new(
2851            self.config.protocol_id_prefix().clone(),
2852            self.config.max_transmit_size(),
2853            self.config.validation_mode().clone(),
2854            self.config.support_floodsub(),
2855        )
2856    }
2857
2858    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
2859        Vec::new()
2860    }
2861
2862    fn inject_connected(&mut self, peer_id: &PeerId) {
2863        // Ignore connections from blacklisted peers.
2864        if self.blacklisted_peers.contains_key(peer_id) {
2865            debug!("Ignoring connection from blacklisted peer: {}", peer_id);
2866            self.disconnect_peer(peer_id.clone(), true); // Disconnect it.
2867            return;
2868        }
2869
2870        info!("New peer connected: {}", peer_id);
2871        // We need to send our subscriptions to the newly-connected node.
2872        let mut subscriptions = vec![];
2873        for topic_hash in self.mesh.keys() {
2874            subscriptions.push(GossipsubSubscription {
2875                topic_hash: topic_hash.clone(),
2876                action: GossipsubSubscriptionAction::Subscribe,
2877            });
2878        }
2879
2880        if !subscriptions.is_empty() {
2881            // send our subscriptions to the peer
2882            if self
2883                .send_message(
2884                    *peer_id,
2885                    GossipsubRpc {
2886                        messages: Vec::new(),
2887                        subscriptions,
2888                        control_msgs: Vec::new(),
2889                    }
2890                    .into_protobuf(),
2891                )
2892                .is_err()
2893            {
2894                error!("Failed to send subscriptions, message too large");
2895            }
2896        }
2897
2898        // Update new connected peer with known peers.
2899        if self.send_peer_list( peer_id ).is_err() {
2900            error!("Failed to send peer list on connect");
2901        }
2902
2903        // Insert an empty set of the topics of this peer until known.
2904        self.peer_topics.insert(*peer_id, Default::default());
2905
2906        // By default we assume a peer is only a floodsub peer.
2907        //
2908        // The protocol negotiation occurs once a message is sent/received. Once this happens we
2909        // update the type of peer that this is in order to determine which kind of routing should
2910        // occur.
2911        self.peer_protocols
2912            .entry(*peer_id)
2913            .or_insert(PeerKind::Floodsub);
2914
2915        if let Some((peer_score, ..)) = &mut self.peer_score {
2916            peer_score.add_peer(*peer_id);
2917        }
2918    }
2919
2920    fn inject_disconnected(&mut self, peer_id: &PeerId) {
2921        // remove from mesh, topic_peers, peer_topic and the fanout
2922        debug!("Peer disconnected: {}", peer_id);
2923        {
2924            let topics = match self.peer_topics.get(peer_id) {
2925                Some(topics) => (topics),
2926                None => {
2927                    if !self.blacklisted_peers.contains_key(peer_id) {
2928                        debug!("Disconnected node, in blacklist");
2929                    }
2930                    return;
2931                }
2932            };
2933
2934            // remove peer from all mappings
2935            for topic in topics {
2936                // check the mesh for the topic
2937                if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
2938                    // check if the peer is in the mesh and remove it
2939                    mesh_peers.remove(peer_id);
2940                }
2941
2942                // remove from topic_peers
2943                if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
2944                    if !peer_list.remove(peer_id) {
2945                        // debugging purposes
2946                        warn!(
2947                            "Disconnected node: {} not in topic_peers peer list",
2948                            peer_id
2949                        );
2950                    }
2951                } else {
2952                    warn!(
2953                        "Disconnected node: {} with topic: {:?} not in topic_peers",
2954                        &peer_id, &topic
2955                    );
2956                }
2957
2958                // remove from fanout
2959                self.fanout
2960                    .get_mut(&topic)
2961                    .map(|peers| peers.remove(peer_id));
2962            }
2963
2964            //forget px and outbound status for this peer
2965            self.px_peers.remove(peer_id);
2966            self.outbound_peers.remove(peer_id);
2967        }
2968
2969        // Remove peer from peer_topics and peer_protocols
2970        // NOTE: It is possible the peer has already been removed from all mappings if it does not
2971        // support the protocol.
2972        self.peer_topics.remove(peer_id);
2973        self.peer_protocols.remove(peer_id);
2974
2975        if let Some((peer_score, ..)) = &mut self.peer_score {
2976            peer_score.remove_peer(peer_id);
2977        }
2978    }
2979
2980    fn inject_connection_established(
2981        &mut self,
2982        peer_id: &PeerId,
2983        _: &ConnectionId,
2984        endpoint: &ConnectedPoint,
2985    ) {
2986        // We are not accepting the peers with non dalek PK identity.
2987        if self.config.accept_dalek_pk_peers_only() && peer_id.as_dalek_pubkey().is_err() {
2988            warn!("Rejecting peer {} because it doesn't identified with Dalek PK", peer_id);
2989            self.disconnect_peer(*peer_id, true); // Disconnect it.
2990            return;
2991        }
2992
2993        // Ignore connections from blacklisted peers.
2994        if self.blacklisted_peers.contains_key(peer_id) {
2995            debug!("Rejecting peer {} because it blacklisted", peer_id);
2996            self.disconnect_peer(*peer_id, true); // Disconnect it.
2997            return;
2998        }
2999
3000        // Check if the peer is an outbound peer
3001        if let ConnectedPoint::Dialer { .. } = endpoint {
3002            // Diverging from the go implementation we only want to consider a peer as outbound peer
3003            // if its first connection is outbound. To check if this connection is the first we
3004            // check if the peer isn't connected yet. This only works because the
3005            // `inject_connection_established` event for the first connection gets called immediately
3006            // before `inject_connected` gets called.
3007            if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
3008                // The first connection is outbound and it is not a peer from peer exchange => mark
3009                // it as outbound peer
3010                self.outbound_peers.insert(*peer_id);
3011            }
3012        }
3013
3014        // Add the IP to the peer scoring system
3015        if let Some((peer_score, ..)) = &mut self.peer_score {
3016            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
3017                peer_score.add_ip(&peer_id, ip);
3018            } else {
3019                trace!(
3020                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3021                    peer_id,
3022                    endpoint
3023                )
3024            }
3025        }
3026    }
3027
3028    fn inject_connection_closed(
3029        &mut self,
3030        peer: &PeerId,
3031        _: &ConnectionId,
3032        endpoint: &ConnectedPoint,
3033    ) {
3034        // Remove IP from peer scoring system
3035        if let Some((peer_score, ..)) = &mut self.peer_score {
3036            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
3037                peer_score.remove_ip(peer, &ip);
3038            } else {
3039                trace!(
3040                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3041                    peer,
3042                    endpoint
3043                )
3044            }
3045        }
3046    }
3047
3048    fn inject_address_change(
3049        &mut self,
3050        peer: &PeerId,
3051        _: &ConnectionId,
3052        endpoint_old: &ConnectedPoint,
3053        endpoint_new: &ConnectedPoint,
3054    ) {
3055        // Exchange IP in peer scoring system
3056        if let Some((peer_score, ..)) = &mut self.peer_score {
3057            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3058                peer_score.remove_ip(peer, &ip);
3059            } else {
3060                trace!(
3061                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3062                    peer,
3063                    endpoint_old
3064                )
3065            }
3066            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3067                peer_score.add_ip(&peer, ip);
3068            } else {
3069                trace!(
3070                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
3071                    peer,
3072                    endpoint_new
3073                )
3074            }
3075        }
3076    }
3077
3078    fn inject_event(
3079        &mut self,
3080        propagation_source: PeerId,
3081        _: ConnectionId,
3082        handler_event: HandlerEvent,
3083    ) {
3084        match handler_event {
3085            HandlerEvent::PeerKind(kind) => {
3086                // We have identified the protocol this peer is using
3087                if let PeerKind::NotSupported = kind {
3088                    debug!(
3089                        "Peer does not support gossipsub protocols. {}",
3090                        propagation_source
3091                    );
3092                    // We treat this peer as disconnected
3093                    self.inject_disconnected(&propagation_source);
3094                } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
3095                    // Only change the value if the old value is Floodsub (the default set in
3096                    // inject_connected). All other PeerKind changes are ignored.
3097                    debug!(
3098                        "New peer type found: {} for peer: {}",
3099                        kind, propagation_source
3100                    );
3101                    if let PeerKind::Floodsub = *old_kind {
3102                        *old_kind = kind;
3103                    }
3104                }
3105            }
3106            HandlerEvent::Message {
3107                rpc,
3108                invalid_messages,
3109            } => {
3110                // Handle the gossipsub RPC
3111
3112                // Handle subscriptions
3113                // Update connected peers topics
3114                if !rpc.subscriptions.is_empty() {
3115                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3116                }
3117
3118                // Check if peer is graylisted in which case we ignore the event
3119                if let (true, _) =
3120                    self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3121                {
3122                    debug!("RPC Dropped from greylisted peer {}", propagation_source);
3123                    return;
3124                }
3125
3126                // Handle any invalid messages from this peer
3127                if self.peer_score.is_some() {
3128                    for (raw_message, validation_error) in invalid_messages {
3129                        self.handle_invalid_message(
3130                            &propagation_source,
3131                            raw_message,
3132                            validation_error,
3133                        )
3134                    }
3135                } else {
3136                    // log the invalid messages
3137                    for (message, validation_error) in invalid_messages {
3138                        warn!(
3139                            "Invalid message. Reason: {:?} propagation_peer {} source {:?}",
3140                            validation_error,
3141                            propagation_source.to_string(),
3142                            message.source
3143                        );
3144                    }
3145                }
3146
3147                // Handle messages
3148                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3149                    // Only process the amount of messages the configuration allows.
3150                    if self.config.max_messages_per_rpc().is_some()
3151                        && Some(count) >= self.config.max_messages_per_rpc()
3152                    {
3153                        warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3154                        break;
3155                    }
3156                    self.handle_received_message(raw_message, &propagation_source);
3157                }
3158
3159                // Handle control messages
3160                // group some control messages, this minimises SendEvents (code is simplified to handle each event at a time however)
3161                let mut ihave_msgs = vec![];
3162                let mut graft_msgs = vec![];
3163                let mut prune_msgs = vec![];
3164                for control_msg in rpc.control_msgs {
3165                    match control_msg {
3166                        GossipsubControlAction::IHave {
3167                            topic_hash,
3168                            message_ids,
3169                        } => {
3170                            ihave_msgs.push((topic_hash, message_ids));
3171                        }
3172                        GossipsubControlAction::IWant { message_ids } => {
3173                            self.handle_iwant(&propagation_source, message_ids)
3174                        }
3175                        GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
3176                        GossipsubControlAction::Prune {
3177                            topic_hash,
3178                            peers,
3179                            backoff,
3180                        } => prune_msgs.push((topic_hash, peers, backoff)),
3181                    }
3182                }
3183                if !ihave_msgs.is_empty() {
3184                    self.handle_ihave(&propagation_source, ihave_msgs);
3185                }
3186                if !graft_msgs.is_empty() {
3187                    self.handle_graft(&propagation_source, graft_msgs);
3188                }
3189                if !prune_msgs.is_empty() {
3190                    self.handle_prune(&propagation_source, prune_msgs);
3191                }
3192            }
3193        }
3194    }
3195
3196    fn poll(
3197        &mut self,
3198        cx: &mut Context<'_>,
3199        _: &mut impl PollParameters,
3200    ) -> Poll<
3201        NetworkBehaviourAction<
3202            <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
3203            Self::OutEvent,
3204        >,
3205    > {
3206        if let Some(event) = self.events.pop_front() {
3207            return Poll::Ready(match event {
3208                NetworkBehaviourAction::NotifyHandler {
3209                    peer_id,
3210                    handler,
3211                    event: send_event,
3212                } => {
3213                    // clone send event reference if others references are present
3214                    let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
3215                    NetworkBehaviourAction::NotifyHandler {
3216                        peer_id,
3217                        event,
3218                        handler,
3219                    }
3220                }
3221                NetworkBehaviourAction::GenerateEvent(e) => {
3222                    NetworkBehaviourAction::GenerateEvent(e)
3223                }
3224                NetworkBehaviourAction::DialAddress { address } => {
3225                    NetworkBehaviourAction::DialAddress { address }
3226                }
3227                NetworkBehaviourAction::DialPeer { peer_id, condition } => {
3228                    NetworkBehaviourAction::DialPeer { peer_id, condition }
3229                }
3230                NetworkBehaviourAction::ReportObservedAddr { address, score } => {
3231                    NetworkBehaviourAction::ReportObservedAddr { address, score }
3232                }
3233                NetworkBehaviourAction::DisconnectPeer { peer_id } => {
3234                    NetworkBehaviourAction::DisconnectPeer { peer_id }
3235                }
3236            });
3237        }
3238
3239        // update scores
3240        if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
3241            while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
3242                peer_score.refresh_scores();
3243            }
3244        }
3245
3246        while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
3247            self.heartbeat();
3248        }
3249
3250        Poll::Pending
3251    }
3252}
3253
3254/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3255/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3256/// that gets as input the number of filtered peers.
3257fn get_random_peers_dynamic(
3258    topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3259    peer_protocols: &HashMap<PeerId, PeerKind>,
3260    topic_hash: &TopicHash,
3261    // maps the number of total peers to the number of selected peers
3262    n_map: impl Fn(usize) -> usize,
3263    mut f: impl FnMut(&PeerId) -> bool,
3264) -> BTreeSet<PeerId> {
3265    let mut gossip_peers = match topic_peers.get(topic_hash) {
3266        // if they exist, filter the peers by `f`
3267        Some(peer_list) => peer_list
3268            .iter()
3269            .cloned()
3270            .filter(|p| {
3271                f(p) && match peer_protocols.get(p) {
3272                    Some(PeerKind::Gossipsub) => true,
3273                    Some(PeerKind::Gossipsubv1_1) => true,
3274                    _ => false,
3275                }
3276            })
3277            .collect(),
3278        None => Vec::new(),
3279    };
3280
3281    // if we have less than needed, return them
3282    let n = n_map(gossip_peers.len());
3283    if gossip_peers.len() <= n {
3284        debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3285        return gossip_peers.into_iter().collect();
3286    }
3287
3288    // we have more peers than needed, shuffle them and return n of them
3289    let mut rng = thread_rng();
3290    gossip_peers.partial_shuffle(&mut rng, n);
3291
3292    debug!("RANDOM PEERS: Got {:?} peers", n);
3293
3294    gossip_peers.into_iter().take(n).collect()
3295}
3296
3297/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3298/// filtered by the function `f`.
3299fn get_random_peers(
3300    topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3301    peer_protocols: &HashMap<PeerId, PeerKind>,
3302    topic_hash: &TopicHash,
3303    n: usize,
3304    f: impl FnMut(&PeerId) -> bool,
3305) -> BTreeSet<PeerId> {
3306    get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
3307}
3308
3309/// Validates the combination of signing, privacy and message validation to ensure the
3310/// configuration will not reject published messages.
3311fn validate_config(
3312    authenticity: &MessageAuthenticity,
3313    validation_mode: &ValidationMode,
3314) -> Result<(), &'static str> {
3315    match validation_mode {
3316        ValidationMode::Anonymous => {
3317            if authenticity.is_signing() {
3318                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3319            }
3320
3321            if !authenticity.is_anonymous() {
3322                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3323            }
3324        }
3325        ValidationMode::Strict => {
3326            if !authenticity.is_signing() {
3327                return Err(
3328                    "Messages will be
3329                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3330                the validation or privacy settings in the config"
3331                );
3332            }
3333        }
3334        _ => {}
3335    }
3336    Ok(())
3337}
3338
3339impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
3340    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3341        f.debug_struct("Gossipsub")
3342            .field("config", &self.config)
3343            .field("events", &self.events)
3344            .field("control_pool", &self.control_pool)
3345            .field("publish_config", &self.publish_config)
3346            .field("topic_peers", &self.topic_peers)
3347            .field("peer_topics", &self.peer_topics)
3348            .field("explicit_peers", &self.explicit_peers)
3349            .field("blacklisted_peers", &self.blacklisted_peers)
3350            .field("mesh", &self.mesh)
3351            .field("fanout", &self.fanout)
3352            .field("fanout_last_pub", &self.fanout_last_pub)
3353            .field("mcache", &self.mcache)
3354            .field("heartbeat", &self.heartbeat)
3355            .field("px_peers", &self.px_peers)
3356            .field("outbound_peers", &self.outbound_peers)
3357            .field("count_received_ihave", &self.count_received_ihave)
3358            .field("count_sent_iwant", &self.count_sent_iwant)
3359            .finish()
3360    }
3361}
3362
3363impl fmt::Debug for PublishConfig {
3364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3365        match self {
3366            PublishConfig::Signing { author, .. } => {
3367                f.write_fmt(format_args!("PublishConfig::Signing({})", author))
3368            }
3369            PublishConfig::Author(author) => {
3370                f.write_fmt(format_args!("PublishConfig::Author({})", author))
3371            }
3372            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3373            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3374        }
3375    }
3376}
3377
3378#[cfg(test)]
3379mod local_test {
3380    use super::*;
3381    use crate::IdentTopic;
3382    use asynchronous_codec::Encoder;
3383    use quickcheck::*;
3384    use rand::Rng;
3385
3386    fn empty_rpc() -> GossipsubRpc {
3387        GossipsubRpc {
3388            subscriptions: Vec::new(),
3389            messages: Vec::new(),
3390            control_msgs: Vec::new(),
3391        }
3392    }
3393
3394    fn test_message() -> RawGossipsubMessage {
3395        RawGossipsubMessage {
3396            source: Some(PeerId::random()),
3397            data: vec![0; 100],
3398            sequence_number: None,
3399            topic: TopicHash::from_raw("test_topic"),
3400            signature: None,
3401            key: None,
3402            validated: false,
3403        }
3404    }
3405
3406    fn test_subscription() -> GossipsubSubscription {
3407        GossipsubSubscription {
3408            action: GossipsubSubscriptionAction::Subscribe,
3409            topic_hash: IdentTopic::new("TestTopic").hash(),
3410        }
3411    }
3412
3413    fn test_control() -> GossipsubControlAction {
3414        GossipsubControlAction::IHave {
3415            topic_hash: IdentTopic::new("TestTopic").hash(),
3416            message_ids: vec![MessageId(vec![12u8]); 5],
3417        }
3418    }
3419
3420    impl Arbitrary for GossipsubRpc {
3421        fn arbitrary<G: Gen>(g: &mut G) -> Self {
3422            let mut rpc = empty_rpc();
3423
3424            for _ in 0..g.gen_range(0, 10) {
3425                rpc.subscriptions.push(test_subscription());
3426            }
3427            for _ in 0..g.gen_range(0, 10) {
3428                rpc.messages.push(test_message());
3429            }
3430            for _ in 0..g.gen_range(0, 10) {
3431                rpc.control_msgs.push(test_control());
3432            }
3433            rpc
3434        }
3435    }
3436
3437    #[test]
3438    /// Tests RPC message fragmentation
3439    fn test_message_fragmentation_deterministic() {
3440        let max_transmit_size = 500;
3441        let config = crate::GossipsubConfigBuilder::default()
3442            .max_transmit_size(max_transmit_size)
3443            .validation_mode(ValidationMode::Permissive)
3444            .build()
3445            .unwrap();
3446        let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3447
3448        // Message under the limit should be fine.
3449        let mut rpc = empty_rpc();
3450        rpc.messages.push(test_message());
3451
3452        let mut rpc_proto = rpc.clone().into_protobuf();
3453        let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
3454        assert_eq!(
3455            fragmented_messages,
3456            vec![Arc::new(rpc_proto.clone())],
3457            "Messages under the limit shouldn't be fragmented"
3458        );
3459
3460        // Messages over the limit should be split
3461
3462        while rpc_proto.encoded_len() < max_transmit_size {
3463            rpc.messages.push(test_message());
3464            rpc_proto = rpc.clone().into_protobuf();
3465        }
3466
3467        let fragmented_messages = gs
3468            .fragment_message(Arc::new(rpc_proto))
3469            .expect("Should be able to fragment the messages");
3470
3471        assert!(
3472            fragmented_messages.len() > 1,
3473            "the message should be fragmented"
3474        );
3475
3476        // all fragmented messages should be under the limit
3477        for message in fragmented_messages {
3478            assert!(
3479                message.encoded_len() < max_transmit_size,
3480                "all messages should be less than the transmission size"
3481            );
3482        }
3483    }
3484
3485    #[test]
3486    fn test_message_fragmentation() {
3487        fn prop(rpc: GossipsubRpc) {
3488            let max_transmit_size = 500;
3489            let config = crate::GossipsubConfigBuilder::default()
3490                .max_transmit_size(max_transmit_size)
3491                .validation_mode(ValidationMode::Permissive)
3492                .build()
3493                .unwrap();
3494            let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3495
3496            let mut length_codec = unsigned_varint::codec::UviBytes::default();
3497            length_codec.set_max_len(max_transmit_size);
3498            let mut codec =
3499                crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);
3500
3501            let rpc_proto = rpc.into_protobuf();
3502            let fragmented_messages = gs
3503                .fragment_message(Arc::new(rpc_proto.clone()))
3504                .expect("Messages must be valid");
3505
3506            if rpc_proto.encoded_len() < max_transmit_size {
3507                assert_eq!(
3508                    fragmented_messages.len(),
3509                    1,
3510                    "the message should not be fragmented"
3511                );
3512            } else {
3513                assert!(
3514                    fragmented_messages.len() > 1,
3515                    "the message should be fragmented"
3516                );
3517            }
3518
3519            // all fragmented messages should be under the limit
3520            for message in fragmented_messages {
3521                assert!(
3522                    message.encoded_len() < max_transmit_size,
3523                    "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
3524                );
3525
3526                // ensure they can all be encoded
3527                let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
3528                codec
3529                    .encode(Arc::try_unwrap(message).unwrap(), &mut buf)
3530                    .unwrap()
3531            }
3532        }
3533        QuickCheck::new()
3534            .max_tests(100)
3535            .quickcheck(prop as fn(_) -> _)
3536    }
3537}