libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{max, Ordering},
23    collections::HashSet,
24    collections::VecDeque,
25    collections::{BTreeSet, HashMap},
26    fmt,
27    iter::FromIterator,
28    net::IpAddr,
29    sync::Arc,
30    task::{Context, Poll},
31    time::Duration,
32};
33
34use futures::StreamExt;
35use log::{debug, error, info, trace, warn};
36use prost::Message;
37use rand::{seq::SliceRandom, thread_rng};
38use wasm_timer::{Instant, Interval};
39
40use libp2p_core::{
41    connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
42    multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
43};
44use libp2p_swarm::{
45    DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
46    ProtocolsHandler,
47};
48
49use crate::backoff::BackoffStorage;
50use crate::config::{GossipsubConfig, ValidationMode};
51use crate::error::{PublishError, SubscriptionError, ValidationError};
52use crate::gossip_promises::GossipPromises;
53use crate::handler::{GossipsubHandler, HandlerEvent};
54use crate::mcache::MessageCache;
55use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
56use crate::protocol::SIGNING_PREFIX;
57use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
58use crate::time_cache::{DuplicateCache, TimeCache};
59use crate::topic::{Hasher, Topic, TopicHash};
60use crate::transform::{DataTransform, IdentityTransform};
61use crate::types::{
62    FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
63    GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
64};
65use crate::types::{GossipsubRpc, PeerKind};
66use crate::{rpc_proto, TopicScoreParams};
67use std::{cmp::Ordering::Equal, fmt::Debug};
68
69#[cfg(test)]
70mod tests;
71
72/// Determines if published messages should be signed or not.
73///
74/// Without signing, a number of privacy preserving modes can be selected.
75///
76/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
77/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages.
78#[derive(Clone)]
79pub enum MessageAuthenticity {
80    /// Message signing is enabled. The author will be the owner of the key and the sequence number
81    /// will be a random number.
82    Signed(Keypair),
83    /// Message signing is disabled.
84    ///
85    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
86    /// number will be randomized.
87    Author(PeerId),
88    /// Message signing is disabled.
89    ///
90    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
91    /// randomized.
92    RandomAuthor,
93    /// Message signing is disabled.
94    ///
95    /// The author of the message and the sequence numbers are excluded from the message.
96    ///
97    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
98    /// enforce validation of these fields. See [`ValidationMode`] in the [`GossipsubConfig`]
99    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
100    /// function will need to be set to prevent all messages from a peer being filtered
101    /// as duplicates.
102    Anonymous,
103}
104
105impl MessageAuthenticity {
106    /// Returns true if signing is enabled.
107    pub fn is_signing(&self) -> bool {
108        matches!(self, MessageAuthenticity::Signed(_))
109    }
110
111    pub fn is_anonymous(&self) -> bool {
112        matches!(self, MessageAuthenticity::Anonymous)
113    }
114}
115
116/// Event that can be emitted by the gossipsub behaviour.
117#[derive(Debug)]
118pub enum GossipsubEvent {
119    /// A message has been received.
120    Message {
121        /// The peer that forwarded us this message.
122        propagation_source: PeerId,
123        /// The [`MessageId`] of the message. This should be referenced by the application when
124        /// validating a message (if required).
125        message_id: MessageId,
126        /// The decompressed message itself.
127        message: GossipsubMessage,
128    },
129    /// A remote subscribed to a topic.
130    Subscribed {
131        /// Remote that has subscribed.
132        peer_id: PeerId,
133        /// The topic it has subscribed to.
134        topic: TopicHash,
135    },
136    /// A remote unsubscribed from a topic.
137    Unsubscribed {
138        /// Remote that has unsubscribed.
139        peer_id: PeerId,
140        /// The topic it has subscribed from.
141        topic: TopicHash,
142    },
143}
144
145/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
146/// for further details.
147enum PublishConfig {
148    Signing {
149        keypair: Keypair,
150        author: PeerId,
151        inline_key: Option<Vec<u8>>,
152    },
153    Author(PeerId),
154    RandomAuthor,
155    Anonymous,
156}
157
158impl PublishConfig {
159    pub fn get_own_id(&self) -> Option<&PeerId> {
160        match self {
161            Self::Signing { author, .. } => Some(&author),
162            Self::Author(author) => Some(&author),
163            _ => None,
164        }
165    }
166}
167
168impl From<MessageAuthenticity> for PublishConfig {
169    fn from(authenticity: MessageAuthenticity) -> Self {
170        match authenticity {
171            MessageAuthenticity::Signed(keypair) => {
172                let public_key = keypair.public();
173                let key_enc = public_key.clone().into_protobuf_encoding();
174                let key = if key_enc.len() <= 42 {
175                    // The public key can be inlined in [`rpc_proto::Message::from`], so we don't include it
176                    // specifically in the [`rpc_proto::Message::key`] field.
177                    None
178                } else {
179                    // Include the protobuf encoding of the public key in the message.
180                    Some(key_enc)
181                };
182
183                PublishConfig::Signing {
184                    keypair,
185                    author: public_key.into_peer_id(),
186                    inline_key: key,
187                }
188            }
189            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
190            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
191            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
192        }
193    }
194}
195
196type GossipsubNetworkBehaviourAction = NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GossipsubEvent>;
197
198/// Network behaviour that handles the gossipsub protocol.
199///
200/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If
201/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
202/// appropriate level to accept unsigned messages.
203///
204/// The DataTransform trait allows applications to optionally add extra encoding/decoding
205/// functionality to the underlying messages. This is intended for custom compression algorithms.
206///
207/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
208/// prevent unwanted messages being propagated and evaluated.
209pub struct Gossipsub<
210    D: DataTransform = IdentityTransform,
211    F: TopicSubscriptionFilter = AllowAllSubscriptionFilter,
212> {
213    /// Configuration providing gossipsub performance parameters.
214    config: GossipsubConfig,
215
216    /// Events that need to be yielded to the outside when polling.
217    events: VecDeque<GossipsubNetworkBehaviourAction>,
218
219    /// Pools non-urgent control messages between heartbeats.
220    control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
221
222    /// Information used for publishing messages.
223    publish_config: PublishConfig,
224
225    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
226    /// duplicates from being propagated to the application and on the network.
227    duplicate_cache: DuplicateCache<MessageId>,
228
229    /// A map of peers to their protocol kind. This is to identify different kinds of gossipsub
230    /// peers.
231    peer_protocols: HashMap<PeerId, PeerKind>,
232
233    /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids.
234    topic_peers: HashMap<TopicHash, BTreeSet<PeerId>>,
235
236    /// A map of all connected peers to their subscribed topics.
237    peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,
238
239    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
240    /// forward messages to, outside of the scoring system.
241    explicit_peers: HashSet<PeerId>,
242
243    /// A list of peers that have been blacklisted by the user.
244    /// Messages are not sent to and are rejected from these peers.
245    blacklisted_peers: HashSet<PeerId>,
246
247    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
248    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
249
250    /// Map of topics to list of peers that we publish to, but don't subscribe to.
251    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
252
253    /// The last publish time for fanout topics.
254    fanout_last_pub: HashMap<TopicHash, Instant>,
255
256    ///Storage for backoffs
257    backoffs: BackoffStorage,
258
259    /// Message cache for the last few heartbeats.
260    mcache: MessageCache,
261
262    /// Heartbeat interval stream.
263    heartbeat: Interval,
264
265    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
266    /// clean up -- eg backoff clean up.
267    heartbeat_ticks: u64,
268
269    /// We remember all peers we found through peer exchange, since those peers are not considered
270    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
271    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
272    /// be removed from this list which may result in a true outbound rediscovery.
273    px_peers: HashSet<PeerId>,
274
275    /// Set of connected outbound peers (we only consider true outbound peers found through
276    /// discovery and not by PX).
277    outbound_peers: HashSet<PeerId>,
278
279    /// Stores optional peer score data together with thresholds, decay interval and gossip
280    /// promises.
281    peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
282
283    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
284    count_received_ihave: HashMap<PeerId, usize>,
285
286    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
287    count_sent_iwant: HashMap<PeerId, usize>,
288
289    /// Short term cache for published messsage ids. This is used for penalizing peers sending
290    /// our own messages back if the messages are anonymous or use a random author.
291    published_message_ids: DuplicateCache<MessageId>,
292
293    /// Short term cache for fast message ids mapping them to the real message ids
294    fast_messsage_id_cache: TimeCache<FastMessageId, MessageId>,
295
296    /// The filter used to handle message subscriptions.
297    subscription_filter: F,
298
299    /// A general transformation function that can be applied to data received from the wire before
300    /// calculating the message-id and sending to the application. This is designed to allow the
301    /// user to implement arbitrary topic-based compression algorithms.
302    data_transform: D,
303}
304
305impl<D, F> Gossipsub<D, F>
306where
307    D: DataTransform + Default,
308    F: TopicSubscriptionFilter + Default,
309{
310    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
311    /// [`GossipsubConfig`]. This has no subscription filter and uses no compression.
312    pub fn new(
313        privacy: MessageAuthenticity,
314        config: GossipsubConfig,
315    ) -> Result<Self, &'static str> {
316        Self::new_with_subscription_filter_and_transform(
317            privacy,
318            config,
319            F::default(),
320            D::default(),
321        )
322    }
323}
324
325impl<D, F> Gossipsub<D, F>
326where
327    D: DataTransform + Default,
328    F: TopicSubscriptionFilter,
329{
330    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
331    /// [`GossipsubConfig`] and a custom subscription filter.
332    pub fn new_with_subscription_filter(
333        privacy: MessageAuthenticity,
334        config: GossipsubConfig,
335        subscription_filter: F,
336    ) -> Result<Self, &'static str> {
337        Self::new_with_subscription_filter_and_transform(
338            privacy,
339            config,
340            subscription_filter,
341            D::default(),
342        )
343    }
344}
345
346impl<D, F> Gossipsub<D, F>
347where
348    D: DataTransform,
349    F: TopicSubscriptionFilter + Default,
350{
351    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
352    /// [`GossipsubConfig`] and a custom data transform.
353    pub fn new_with_transform(
354        privacy: MessageAuthenticity,
355        config: GossipsubConfig,
356        data_transform: D,
357    ) -> Result<Self, &'static str> {
358        Self::new_with_subscription_filter_and_transform(
359            privacy,
360            config,
361            F::default(),
362            data_transform,
363        )
364    }
365}
366
367impl<D, F> Gossipsub<D, F>
368where
369    D: DataTransform,
370    F: TopicSubscriptionFilter,
371{
372    /// Creates a [`Gossipsub`] struct given a set of parameters specified via a
373    /// [`GossipsubConfig`] and a custom subscription filter and data transform.
374    pub fn new_with_subscription_filter_and_transform(
375        privacy: MessageAuthenticity,
376        config: GossipsubConfig,
377        subscription_filter: F,
378        data_transform: D,
379    ) -> Result<Self, &'static str> {
380        // Set up the router given the configuration settings.
381
382        // We do not allow configurations where a published message would also be rejected if it
383        // were received locally.
384        validate_config(&privacy, &config.validation_mode())?;
385
386        // Set up message publishing parameters.
387
388        Ok(Gossipsub {
389            events: VecDeque::new(),
390            control_pool: HashMap::new(),
391            publish_config: privacy.into(),
392            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
393            fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
394            topic_peers: HashMap::new(),
395            peer_topics: HashMap::new(),
396            explicit_peers: HashSet::new(),
397            blacklisted_peers: HashSet::new(),
398            mesh: HashMap::new(),
399            fanout: HashMap::new(),
400            fanout_last_pub: HashMap::new(),
401            backoffs: BackoffStorage::new(
402                &config.prune_backoff(),
403                config.heartbeat_interval(),
404                config.backoff_slack(),
405            ),
406            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
407            heartbeat: Interval::new_at(
408                Instant::now() + config.heartbeat_initial_delay(),
409                config.heartbeat_interval(),
410            ),
411            heartbeat_ticks: 0,
412            px_peers: HashSet::new(),
413            outbound_peers: HashSet::new(),
414            peer_score: None,
415            count_received_ihave: HashMap::new(),
416            count_sent_iwant: HashMap::new(),
417            peer_protocols: HashMap::new(),
418            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
419            config,
420            subscription_filter,
421            data_transform,
422        })
423    }
424}
425
426impl<D, F> Gossipsub<D, F>
427where
428    D: DataTransform,
429    F: TopicSubscriptionFilter,
430{
431    /// Lists the hashes of the topics we are currently subscribed to.
432    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
433        self.mesh.keys()
434    }
435
436    /// Lists all mesh peers for a certain topic hash.
437    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
438        self.mesh
439            .get(topic_hash)
440            .into_iter()
441            .map(|x| x.iter())
442            .flatten()
443    }
444
445    /// Lists all mesh peers for all topics.
446    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
447        let mut res = BTreeSet::new();
448        for peers in self.mesh.values() {
449            res.extend(peers);
450        }
451        res.into_iter()
452    }
453
454    /// Lists all known peers and their associated subscribed topics.
455    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
456        self.peer_topics
457            .iter()
458            .map(|(peer_id, topic_set)| (peer_id, topic_set.iter().collect()))
459    }
460
461    /// Lists all known peers and their associated protocol.
462    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
463        self.peer_protocols.iter()
464    }
465
466    /// Returns the gossipsub score for a given peer, if one exists.
467    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
468        self.peer_score
469            .as_ref()
470            .map(|(score, ..)| score.score(peer_id))
471    }
472
473    /// Subscribe to a topic.
474    ///
475    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
476    /// subscribed.
477    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
478        debug!("Subscribing to topic: {}", topic);
479        let topic_hash = topic.hash();
480        if !self.subscription_filter.can_subscribe(&topic_hash) {
481            return Err(SubscriptionError::NotAllowed);
482        }
483
484        if self.mesh.get(&topic_hash).is_some() {
485            debug!("Topic: {} is already in the mesh.", topic);
486            return Ok(false);
487        }
488
489        // send subscription request to all peers
490        let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
491        if !peer_list.is_empty() {
492            let event = Arc::new(
493                GossipsubRpc {
494                    messages: Vec::new(),
495                    subscriptions: vec![GossipsubSubscription {
496                        topic_hash: topic_hash.clone(),
497                        action: GossipsubSubscriptionAction::Subscribe,
498                    }],
499                    control_msgs: Vec::new(),
500                }
501                .into_protobuf(),
502            );
503
504            for peer in peer_list {
505                debug!("Sending SUBSCRIBE to peer: {:?}", peer);
506                self.send_message(peer, event.clone())
507                    .map_err(SubscriptionError::PublishError)?;
508            }
509        }
510
511        // call JOIN(topic)
512        // this will add new peers to the mesh for the topic
513        self.join(&topic_hash);
514        info!("Subscribed to topic: {}", topic);
515        Ok(true)
516    }
517
518    /// Unsubscribes from a topic.
519    ///
520    /// Returns [`Ok(true)`] if we were subscribed to this topic.
521    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
522        debug!("Unsubscribing from topic: {}", topic);
523        let topic_hash = topic.hash();
524
525        if self.mesh.get(&topic_hash).is_none() {
526            debug!("Already unsubscribed from topic: {:?}", topic_hash);
527            // we are not subscribed
528            return Ok(false);
529        }
530
531        // announce to all peers
532        let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
533        if !peer_list.is_empty() {
534            let event = Arc::new(
535                GossipsubRpc {
536                    messages: Vec::new(),
537                    subscriptions: vec![GossipsubSubscription {
538                        topic_hash: topic_hash.clone(),
539                        action: GossipsubSubscriptionAction::Unsubscribe,
540                    }],
541                    control_msgs: Vec::new(),
542                }
543                .into_protobuf(),
544            );
545
546            for peer in peer_list {
547                debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string());
548                self.send_message(peer, event.clone())?;
549            }
550        }
551
552        // call LEAVE(topic)
553        // this will remove the topic from the mesh
554        self.leave(&topic_hash);
555
556        info!("Unsubscribed from topic: {:?}", topic_hash);
557        Ok(true)
558    }
559
560    /// Publishes a message with multiple topics to the network.
561    pub fn publish<H: Hasher>(
562        &mut self,
563        topic: Topic<H>,
564        data: impl Into<Vec<u8>>,
565    ) -> Result<MessageId, PublishError> {
566        let data = data.into();
567
568        // Transform the data before building a raw_message.
569        let transformed_data = self
570            .data_transform
571            .outbound_transform(&topic.hash(), data.clone())?;
572
573        let raw_message = self.build_raw_message(topic.into(), transformed_data)?;
574
575        // calculate the message id from the un-transformed data
576        let msg_id = self.config.message_id(&GossipsubMessage {
577            source: raw_message.source,
578            data, // the uncompressed form
579            sequence_number: raw_message.sequence_number,
580            topic: raw_message.topic.clone(),
581        });
582
583        let event = Arc::new(
584            GossipsubRpc {
585                subscriptions: Vec::new(),
586                messages: vec![raw_message.clone()],
587                control_msgs: Vec::new(),
588            }
589            .into_protobuf(),
590        );
591
592        // check that the size doesn't exceed the max transmission size
593        if event.encoded_len() > self.config.max_transmit_size() {
594            return Err(PublishError::MessageTooLarge);
595        }
596
597        // Check the if the message has been published before
598        if self.duplicate_cache.contains(&msg_id) {
599            // This message has already been seen. We don't re-publish messages that have already
600            // been published on the network.
601            warn!(
602                "Not publishing a message that has already been published. Msg-id {}",
603                msg_id
604            );
605            return Err(PublishError::Duplicate);
606        }
607
608        debug!("Publishing message: {:?}", msg_id);
609
610        let topic_hash = raw_message.topic.clone();
611
612        // If we are not flood publishing forward the message to mesh peers.
613        let mesh_peers_sent =
614            !self.config.flood_publish() && self.forward_msg(&msg_id, raw_message.clone(), None)?;
615
616        let mut recipient_peers = HashSet::new();
617        if let Some(set) = self.topic_peers.get(&topic_hash) {
618            if self.config.flood_publish() {
619                // Forward to all peers above score and all explicit peers
620                recipient_peers.extend(
621                    set.iter()
622                        .filter(|p| {
623                            self.explicit_peers.contains(*p)
624                                || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
625                        })
626                        .cloned(),
627                );
628            } else {
629                // Explicit peers
630                for peer in &self.explicit_peers {
631                    if set.contains(peer) {
632                        recipient_peers.insert(*peer);
633                    }
634                }
635
636                // Floodsub peers
637                for (peer, kind) in &self.peer_protocols {
638                    if kind == &PeerKind::Floodsub
639                        && !self
640                            .score_below_threshold(peer, |ts| ts.publish_threshold)
641                            .0
642                    {
643                        recipient_peers.insert(*peer);
644                    }
645                }
646
647                // Gossipsub peers
648                if self.mesh.get(&topic_hash).is_none() {
649                    debug!("Topic: {:?} not in the mesh", topic_hash);
650                    // If we have fanout peers add them to the map.
651                    if self.fanout.contains_key(&topic_hash) {
652                        for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
653                            recipient_peers.insert(*peer);
654                        }
655                    } else {
656                        // We have no fanout peers, select mesh_n of them and add them to the fanout
657                        let mesh_n = self.config.mesh_n();
658                        let new_peers = get_random_peers(
659                            &self.topic_peers,
660                            &self.peer_protocols,
661                            &topic_hash,
662                            mesh_n,
663                            {
664                                |p| {
665                                    !self.explicit_peers.contains(p)
666                                        && !self
667                                            .score_below_threshold(p, |pst| pst.publish_threshold)
668                                            .0
669                                }
670                            },
671                        );
672                        // Add the new peers to the fanout and recipient peers
673                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
674                        for peer in new_peers {
675                            debug!("Peer added to fanout: {:?}", peer);
676                            recipient_peers.insert(peer);
677                        }
678                    }
679                    // We are publishing to fanout peers - update the time we published
680                    self.fanout_last_pub
681                        .insert(topic_hash.clone(), Instant::now());
682                }
683            }
684        }
685
686        if recipient_peers.is_empty() && !mesh_peers_sent {
687            return Err(PublishError::InsufficientPeers);
688        }
689
690        // If the message isn't a duplicate and we have sent it to some peers add it to the
691        // duplicate cache and memcache.
692        self.duplicate_cache.insert(msg_id.clone());
693        self.mcache.put(&msg_id, raw_message);
694
695        // If the message is anonymous or has a random author add it to the published message ids
696        // cache.
697        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
698            if !self.config.allow_self_origin() {
699                self.published_message_ids.insert(msg_id.clone());
700            }
701        }
702
703        // Send to peers we know are subscribed to the topic.
704        for peer_id in recipient_peers.iter() {
705            debug!("Sending message to peer: {:?}", peer_id);
706            self.send_message(*peer_id, event.clone())?;
707        }
708
709        info!("Published message: {:?}", &msg_id);
710        Ok(msg_id)
711    }
712
713    /// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after
714    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
715    /// validation is expected to be fast enough that the messages should still exist in the cache.
716    /// There are three possible validation outcomes and the outcome is given in acceptance.
717    ///
718    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
719    /// network. The `propagation_source` parameter indicates who the message was received by and
720    /// will not be forwarded back to that peer.
721    ///
722    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
723    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
724    //
725    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
726    /// but no Pâ‚„ penalty will be applied.
727    ///
728    /// This function will return true if the message was found in the cache and false if was not
729    /// in the cache anymore.
730    ///
731    /// This should only be called once per message.
732    pub fn report_message_validation_result(
733        &mut self,
734        msg_id: &MessageId,
735        propagation_source: &PeerId,
736        acceptance: MessageAcceptance,
737    ) -> Result<bool, PublishError> {
738        let reject_reason = match acceptance {
739            MessageAcceptance::Accept => {
740                let raw_message = match self.mcache.validate(msg_id) {
741                    Some(raw_message) => raw_message.clone(),
742                    None => {
743                        warn!(
744                            "Message not in cache. Ignoring forwarding. Message Id: {}",
745                            msg_id
746                        );
747                        return Ok(false);
748                    }
749                };
750                self.forward_msg(msg_id, raw_message, Some(propagation_source))?;
751                return Ok(true);
752            }
753            MessageAcceptance::Reject => RejectReason::ValidationFailed,
754            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
755        };
756
757        if let Some(raw_message) = self.mcache.remove(msg_id) {
758            // Tell peer_score about reject
759            if let Some((peer_score, ..)) = &mut self.peer_score {
760                peer_score.reject_message(
761                    propagation_source,
762                    msg_id,
763                    &raw_message.topic,
764                    reject_reason,
765                );
766            }
767            Ok(true)
768        } else {
769            warn!("Rejected message not in cache. Message Id: {}", msg_id);
770            Ok(false)
771        }
772    }
773
774    /// Adds a new peer to the list of explicitly connected peers.
775    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
776        debug!("Adding explicit peer {}", peer_id);
777
778        self.explicit_peers.insert(*peer_id);
779
780        self.check_explicit_peer_connection(peer_id);
781    }
782
783    /// This removes the peer from explicitly connected peers, note that this does not disconnect
784    /// the peer.
785    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
786        debug!("Removing explicit peer {}", peer_id);
787        self.explicit_peers.remove(peer_id);
788    }
789
790    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
791    /// created by this peer will be rejected.
792    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
793        if self.blacklisted_peers.insert(*peer_id) {
794            debug!("Peer has been blacklisted: {}", peer_id);
795        }
796    }
797
798    /// Removes a peer from the blacklist if it has previously been blacklisted.
799    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
800        if self.blacklisted_peers.remove(peer_id) {
801            debug!("Peer has been removed from the blacklist: {}", peer_id);
802        }
803    }
804
805    /// Activates the peer scoring system with the given parameters. This will reset all scores
806    /// if there was already another peer scoring system activated. Returns an error if the
807    /// params are not valid or if they got already set.
808    pub fn with_peer_score(
809        &mut self,
810        params: PeerScoreParams,
811        threshold: PeerScoreThresholds,
812    ) -> Result<(), String> {
813        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
814    }
815
816    /// Activates the peer scoring system with the given parameters and a message delivery time
817    /// callback. Returns an error if the parameters got already set.
818    pub fn with_peer_score_and_message_delivery_time_callback(
819        &mut self,
820        params: PeerScoreParams,
821        threshold: PeerScoreThresholds,
822        callback: Option<fn(&PeerId, &TopicHash, f64)>,
823    ) -> Result<(), String> {
824        params.validate()?;
825        threshold.validate()?;
826
827        if self.peer_score.is_some() {
828            return Err("Peer score set twice".into());
829        }
830
831        let interval = Interval::new(params.decay_interval);
832        let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
833        self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
834        Ok(())
835    }
836
837    /// Sets scoring parameters for a topic.
838    ///
839    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
840    pub fn set_topic_params<H: Hasher>(
841        &mut self,
842        topic: Topic<H>,
843        params: TopicScoreParams,
844    ) -> Result<(), &'static str> {
845        if let Some((peer_score, ..)) = &mut self.peer_score {
846            peer_score.set_topic_params(topic.hash(), params);
847            Ok(())
848        } else {
849            Err("Peer score must be initialised with `with_peer_score()`")
850        }
851    }
852
853    /// Sets the application specific score for a peer. Returns true if scoring is active and
854    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
855    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
856        if let Some((peer_score, ..)) = &mut self.peer_score {
857            peer_score.set_application_score(peer_id, new_score)
858        } else {
859            false
860        }
861    }
862
863    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
864    fn join(&mut self, topic_hash: &TopicHash) {
865        debug!("Running JOIN for topic: {:?}", topic_hash);
866
867        // if we are already in the mesh, return
868        if self.mesh.contains_key(topic_hash) {
869            info!("JOIN: The topic is already in the mesh, ignoring JOIN");
870            return;
871        }
872
873        let mut added_peers = HashSet::new();
874
875        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
876        // removing the fanout entry.
877        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
878            debug!(
879                "JOIN: Removing peers from the fanout for topic: {:?}",
880                topic_hash
881            );
882
883            // remove explicit peers, peers with negative scores, and backoffed peers
884            peers = peers
885                .into_iter()
886                .filter(|p| {
887                    !self.explicit_peers.contains(p)
888                        && !self.score_below_threshold(p, |_| 0.0).0
889                        && !self.backoffs.is_backoff_with_slack(topic_hash, p)
890                })
891                .collect();
892
893            // Add up to mesh_n of them them to the mesh
894            // NOTE: These aren't randomly added, currently FIFO
895            let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
896            debug!(
897                "JOIN: Adding {:?} peers from the fanout for topic: {:?}",
898                add_peers, topic_hash
899            );
900            added_peers.extend(peers.iter().cloned().take(add_peers));
901            self.mesh.insert(
902                topic_hash.clone(),
903                peers.into_iter().take(add_peers).collect(),
904            );
905            // remove the last published time
906            self.fanout_last_pub.remove(topic_hash);
907        }
908
909        // check if we need to get more peers, which we randomly select
910        if added_peers.len() < self.config.mesh_n() {
911            // get the peers
912            let new_peers = get_random_peers(
913                &self.topic_peers,
914                &self.peer_protocols,
915                topic_hash,
916                self.config.mesh_n() - added_peers.len(),
917                |peer| {
918                    !added_peers.contains(peer)
919                        && !self.explicit_peers.contains(peer)
920                        && !self.score_below_threshold(peer, |_| 0.0).0
921                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
922                },
923            );
924            added_peers.extend(new_peers.clone());
925            // add them to the mesh
926            debug!(
927                "JOIN: Inserting {:?} random peers into the mesh",
928                new_peers.len()
929            );
930            let mesh_peers = self
931                .mesh
932                .entry(topic_hash.clone())
933                .or_insert_with(Default::default);
934            mesh_peers.extend(new_peers);
935        }
936
937        for peer_id in added_peers {
938            // Send a GRAFT control message
939            info!("JOIN: Sending Graft message to peer: {:?}", peer_id);
940            if let Some((peer_score, ..)) = &mut self.peer_score {
941                peer_score.graft(&peer_id, topic_hash.clone());
942            }
943            Self::control_pool_add(
944                &mut self.control_pool,
945                peer_id,
946                GossipsubControlAction::Graft {
947                    topic_hash: topic_hash.clone(),
948                },
949            );
950        }
951        debug!("Completed JOIN for topic: {:?}", topic_hash);
952    }
953
954    /// Creates a PRUNE gossipsub action.
955    fn make_prune(
956        &mut self,
957        topic_hash: &TopicHash,
958        peer: &PeerId,
959        do_px: bool,
960    ) -> GossipsubControlAction {
961        if let Some((peer_score, ..)) = &mut self.peer_score {
962            peer_score.prune(peer, topic_hash.clone());
963        }
964
965        match self.peer_protocols.get(peer) {
966            Some(PeerKind::Floodsub) => {
967                error!("Attempted to prune a Floodsub peer");
968            }
969            Some(PeerKind::Gossipsub) => {
970                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
971                return GossipsubControlAction::Prune {
972                    topic_hash: topic_hash.clone(),
973                    peers: Vec::new(),
974                    backoff: None,
975                };
976            }
977            None => {
978                error!("Attempted to Prune an unknown peer");
979            }
980            _ => {} // Gossipsub 1.1 peer perform the `Prune`
981        }
982
983        // Select peers for peer exchange
984        let peers = if do_px {
985            get_random_peers(
986                &self.topic_peers,
987                &self.peer_protocols,
988                &topic_hash,
989                self.config.prune_peers(),
990                |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
991            )
992            .into_iter()
993            .map(|p| PeerInfo { peer_id: Some(p) })
994            .collect()
995        } else {
996            Vec::new()
997        };
998
999        // update backoff
1000        self.backoffs
1001            .update_backoff(topic_hash, peer, self.config.prune_backoff());
1002
1003        GossipsubControlAction::Prune {
1004            topic_hash: topic_hash.clone(),
1005            peers,
1006            backoff: Some(self.config.prune_backoff().as_secs()),
1007        }
1008    }
1009
1010    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1011    fn leave(&mut self, topic_hash: &TopicHash) {
1012        debug!("Running LEAVE for topic {:?}", topic_hash);
1013
1014        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1015        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1016            for peer in peers {
1017                // Send a PRUNE control message
1018                info!("LEAVE: Sending PRUNE to peer: {:?}", peer);
1019                let control = self.make_prune(topic_hash, &peer, self.config.do_px());
1020                Self::control_pool_add(&mut self.control_pool, peer, control);
1021            }
1022        }
1023        debug!("Completed LEAVE for topic: {:?}", topic_hash);
1024    }
1025
1026    /// Checks if the given peer is still connected and if not dials the peer again.
1027    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1028        if !self.peer_topics.contains_key(peer_id) {
1029            // Connect to peer
1030            debug!("Connecting to explicit peer {:?}", peer_id);
1031            self.events.push_back(NetworkBehaviourAction::DialPeer {
1032                peer_id: *peer_id,
1033                condition: DialPeerCondition::Disconnected,
1034            });
1035        }
1036    }
1037
1038    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
1039    /// `threshold` parameter.
1040    fn score_below_threshold(
1041        &self,
1042        peer_id: &PeerId,
1043        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1044    ) -> (bool, f64) {
1045        Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1046    }
1047
1048    fn score_below_threshold_from_scores(
1049        peer_score: &Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
1050        peer_id: &PeerId,
1051        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1052    ) -> (bool, f64) {
1053        if let Some((peer_score, thresholds, ..)) = peer_score {
1054            let score = peer_score.score(peer_id);
1055            if score < threshold(thresholds) {
1056                return (true, score);
1057            }
1058            (false, score)
1059        } else {
1060            (false, 0.0)
1061        }
1062    }
1063
1064    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1065    /// requests it with an IWANT control message.
1066    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1067        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1068        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1069            debug!(
1070                "IHAVE: ignoring peer {:?} with score below threshold [score = {}]",
1071                peer_id, score
1072            );
1073            return;
1074        }
1075
1076        // IHAVE flood protection
1077        let peer_have = self
1078            .count_received_ihave
1079            .entry(*peer_id)
1080            .or_insert(0);
1081        *peer_have += 1;
1082        if *peer_have > self.config.max_ihave_messages() {
1083            debug!(
1084                "IHAVE: peer {} has advertised too many times ({}) within this heartbeat \
1085            interval; ignoring",
1086                peer_id, *peer_have
1087            );
1088            return;
1089        }
1090
1091        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1092            if *iasked >= self.config.max_ihave_length() {
1093                debug!(
1094                    "IHAVE: peer {} has already advertised too many messages ({}); ignoring",
1095                    peer_id, *iasked
1096                );
1097                return;
1098            }
1099        }
1100
1101        debug!("Handling IHAVE for peer: {:?}", peer_id);
1102
1103        // use a hashset to avoid duplicates efficiently
1104        let mut iwant_ids = HashSet::new();
1105
1106        for (topic, ids) in ihave_msgs {
1107            // only process the message if we are subscribed
1108            if !self.mesh.contains_key(&topic) {
1109                debug!(
1110                    "IHAVE: Ignoring IHAVE - Not subscribed to topic: {:?}",
1111                    topic
1112                );
1113                continue;
1114            }
1115
1116            for id in ids {
1117                if !self.duplicate_cache.contains(&id) {
1118                    // have not seen this message, request it
1119                    iwant_ids.insert(id);
1120                }
1121            }
1122        }
1123
1124        if !iwant_ids.is_empty() {
1125            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1126            let mut iask = iwant_ids.len();
1127            if *iasked + iask > self.config.max_ihave_length() {
1128                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1129            }
1130
1131            // Send the list of IWANT control messages
1132            debug!(
1133                "IHAVE: Asking for {} out of {} messages from {}",
1134                iask,
1135                iwant_ids.len(),
1136                peer_id
1137            );
1138
1139            //ask in random order
1140            let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
1141            let mut rng = thread_rng();
1142            iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);
1143
1144            iwant_ids_vec.truncate(iask as usize);
1145            *iasked += iask;
1146
1147            let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
1148            if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1149                gossip_promises.add_promise(
1150                    *peer_id,
1151                    &message_ids,
1152                    Instant::now() + self.config.iwant_followup_time(),
1153                );
1154            }
1155            debug!(
1156                "IHAVE: Asking for the following messages from {}: {:?}",
1157                peer_id, message_ids
1158            );
1159
1160            Self::control_pool_add(
1161                &mut self.control_pool,
1162                *peer_id,
1163                GossipsubControlAction::IWant { message_ids },
1164            );
1165        }
1166        debug!("Completed IHAVE handling for peer: {:?}", peer_id);
1167    }
1168
1169    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1170    /// forwarded to the requesting peer.
1171    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1172        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1173        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1174            debug!(
1175                "IWANT: ignoring peer {:?} with score below threshold [score = {}]",
1176                peer_id, score
1177            );
1178            return;
1179        }
1180
1181        debug!("Handling IWANT for peer: {:?}", peer_id);
1182        // build a hashmap of available messages
1183        let mut cached_messages = HashMap::new();
1184
1185        for id in iwant_msgs {
1186            // If we have it and the IHAVE count is not above the threshold, add it do the
1187            // cached_messages mapping
1188            if let Some((msg, count)) = self.mcache.get_with_iwant_counts(&id, peer_id) {
1189                if count > self.config.gossip_retransimission() {
1190                    debug!(
1191                        "IWANT: Peer {} has asked for message {} too many times; ignoring \
1192                    request",
1193                        peer_id, &id
1194                    );
1195                } else {
1196                    cached_messages.insert(id.clone(), msg.clone());
1197                }
1198            }
1199        }
1200
1201        if !cached_messages.is_empty() {
1202            debug!("IWANT: Sending cached messages to peer: {:?}", peer_id);
1203            // Send the messages to the peer
1204            let message_list = cached_messages
1205                .into_iter()
1206                .map(|entry| entry.1)
1207                .collect();
1208            if self
1209                .send_message(
1210                    *peer_id,
1211                    GossipsubRpc {
1212                        subscriptions: Vec::new(),
1213                        messages: message_list,
1214                        control_msgs: Vec::new(),
1215                    }
1216                    .into_protobuf(),
1217                )
1218                .is_err()
1219            {
1220                error!("Failed to send cached messages. Messages too large");
1221            }
1222        }
1223        debug!("Completed IWANT handling for peer: {}", peer_id);
1224    }
1225
1226    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1227    /// responds with PRUNE messages.
1228    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1229        debug!("Handling GRAFT message for peer: {}", peer_id);
1230
1231        let mut to_prune_topics = HashSet::new();
1232
1233        let mut do_px = self.config.do_px();
1234
1235        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1236        if self.explicit_peers.contains(peer_id) {
1237            warn!("GRAFT: ignoring request from direct peer {}", peer_id);
1238            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1239            to_prune_topics = HashSet::from_iter(topics.into_iter());
1240            // but don't PX
1241            do_px = false
1242        } else {
1243            let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1244            let now = Instant::now();
1245            for topic_hash in topics {
1246                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1247                    // if the peer is already in the mesh ignore the graft
1248                    if peers.contains(peer_id) {
1249                        debug!(
1250                            "GRAFT: Received graft for peer {:?} that is already in topic {:?}",
1251                            peer_id, &topic_hash
1252                        );
1253                        continue;
1254                    }
1255
1256                    // make sure we are not backing off that peer
1257                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1258                    {
1259                        if backoff_time > now {
1260                            warn!(
1261                                "GRAFT: peer attempted graft within backoff time, penalizing {}",
1262                                peer_id
1263                            );
1264                            // add behavioural penalty
1265                            if let Some((peer_score, ..)) = &mut self.peer_score {
1266                                peer_score.add_penalty(peer_id, 1);
1267
1268                                // check the flood cutoff
1269                                let flood_cutoff = (backoff_time
1270                                    + self.config.graft_flood_threshold())
1271                                    - self.config.prune_backoff();
1272                                if flood_cutoff > now {
1273                                    //extra penalty
1274                                    peer_score.add_penalty(peer_id, 1);
1275                                }
1276                            }
1277                            //no PX
1278                            do_px = false;
1279
1280                            to_prune_topics.insert(topic_hash.clone());
1281                            continue;
1282                        }
1283                    }
1284
1285                    //check the score
1286                    if below_zero {
1287                        // we don't GRAFT peers with negative score
1288                        debug!(
1289                            "GRAFT: ignoring peer {:?} with negative score [score = {}, \
1290                        topic = {}]",
1291                            peer_id, score, topic_hash
1292                        );
1293                        // we do send them PRUNE however, because it's a matter of protocol correctness
1294                        to_prune_topics.insert(topic_hash.clone());
1295                        // but we won't PX to them
1296                        do_px = false;
1297                        continue;
1298                    }
1299
1300                    // check mesh upper bound and only allow graft if the upper bound is not reached or
1301                    // if it is an outbound peer
1302                    if peers.len() >= self.config.mesh_n_high()
1303                        && !self.outbound_peers.contains(peer_id)
1304                    {
1305                        to_prune_topics.insert(topic_hash.clone());
1306                        continue;
1307                    }
1308
1309                    // add peer to the mesh
1310                    info!(
1311                        "GRAFT: Mesh link added for peer: {:?} in topic: {:?}",
1312                        peer_id, &topic_hash
1313                    );
1314                    peers.insert(*peer_id);
1315
1316                    if let Some((peer_score, ..)) = &mut self.peer_score {
1317                        peer_score.graft(peer_id, topic_hash);
1318                    }
1319                } else {
1320                    // don't do PX when there is an unknown topic to avoid leaking our peers
1321                    do_px = false;
1322                    debug!(
1323                        "GRAFT: Received graft for unknown topic {:?} from peer {:?}",
1324                        &topic_hash, peer_id
1325                    );
1326                    // spam hardening: ignore GRAFTs for unknown topics
1327                    continue;
1328                }
1329            }
1330        }
1331
1332        if !to_prune_topics.is_empty() {
1333            // build the prune messages to send
1334            let prune_messages = to_prune_topics
1335                .iter()
1336                .map(|t| self.make_prune(t, peer_id, do_px))
1337                .collect();
1338            // Send the prune messages to the peer
1339            info!(
1340                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer: {}",
1341                peer_id
1342            );
1343
1344            if self
1345                .send_message(
1346                    *peer_id,
1347                    GossipsubRpc {
1348                        subscriptions: Vec::new(),
1349                        messages: Vec::new(),
1350                        control_msgs: prune_messages,
1351                    }
1352                    .into_protobuf(),
1353                )
1354                .is_err()
1355            {
1356                error!("Failed to send graft. Message too large");
1357            }
1358        }
1359        debug!("Completed GRAFT handling for peer: {}", peer_id);
1360    }
1361
1362    fn remove_peer_from_mesh(
1363        &mut self,
1364        peer_id: &PeerId,
1365        topic_hash: &TopicHash,
1366        backoff: Option<u64>,
1367        always_update_backoff: bool,
1368    ) {
1369        let mut update_backoff = always_update_backoff;
1370        if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1371            // remove the peer if it exists in the mesh
1372            if peers.remove(peer_id) {
1373                info!(
1374                    "PRUNE: Removing peer: {} from the mesh for topic: {}",
1375                    peer_id.to_string(),
1376                    topic_hash
1377                );
1378
1379                if let Some((peer_score, ..)) = &mut self.peer_score {
1380                    peer_score.prune(peer_id, topic_hash.clone());
1381                }
1382
1383                update_backoff = true;
1384            }
1385        }
1386        if update_backoff {
1387            let time = if let Some(backoff) = backoff {
1388                Duration::from_secs(backoff)
1389            } else {
1390                self.config.prune_backoff()
1391            };
1392            // is there a backoff specified by the peer? if so obey it.
1393            self.backoffs.update_backoff(&topic_hash, peer_id, time);
1394        }
1395    }
1396
1397    /// Handles PRUNE control messages. Removes peer from the mesh.
1398    fn handle_prune(
1399        &mut self,
1400        peer_id: &PeerId,
1401        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1402    ) {
1403        debug!("Handling PRUNE message for peer: {}", peer_id);
1404        let (below_threshold, score) =
1405            self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1406        for (topic_hash, px, backoff) in prune_data {
1407            self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true);
1408
1409            if self.mesh.contains_key(&topic_hash) {
1410                //connect to px peers
1411                if !px.is_empty() {
1412                    // we ignore PX from peers with insufficient score
1413                    if below_threshold {
1414                        debug!(
1415                            "PRUNE: ignoring PX from peer {:?} with insufficient score \
1416                             [score ={} topic = {}]",
1417                            peer_id, score, topic_hash
1418                        );
1419                        continue;
1420                    }
1421
1422                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1423                    // know their multiaddr. Until SignedRecords are spec'd this
1424                    // remains a stub. By default `config.prune_peers()` is set to zero and
1425                    // this is skipped. If the user modifies this, this will only be able to
1426                    // dial already known peers (from an external discovery mechanism for
1427                    // example).
1428                    if self.config.prune_peers() > 0 {
1429                        self.px_connect(px);
1430                    }
1431                }
1432            }
1433        }
1434        debug!("Completed PRUNE handling for peer: {}", peer_id.to_string());
1435    }
1436
1437    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1438        let n = self.config.prune_peers();
1439        // Ignore peerInfo with no ID
1440        //
1441        //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1442        // signed peer record?
1443        px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
1444        if px.len() > n {
1445            // only use at most prune_peers many random peers
1446            let mut rng = thread_rng();
1447            px.partial_shuffle(&mut rng, n);
1448            px = px.into_iter().take(n).collect();
1449        }
1450
1451        for p in px {
1452            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1453            // it, see https://github.com/libp2p/specs/pull/217
1454            if let Some(peer_id) = p.peer_id {
1455                // mark as px peer
1456                self.px_peers.insert(peer_id);
1457
1458                // dial peer
1459                self.events.push_back(NetworkBehaviourAction::DialPeer {
1460                    peer_id,
1461                    condition: DialPeerCondition::Disconnected,
1462                });
1463            }
1464        }
1465    }
1466
1467    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1468    /// checks.
1469    fn message_is_valid(
1470        &mut self,
1471        msg_id: &MessageId,
1472        raw_message: &mut RawGossipsubMessage,
1473        propagation_source: &PeerId,
1474    ) -> bool {
1475        debug!(
1476            "Handling message: {:?} from peer: {}",
1477            msg_id,
1478            propagation_source.to_string()
1479        );
1480
1481        // Reject any message from a blacklisted peer
1482        if self.blacklisted_peers.contains(propagation_source) {
1483            debug!(
1484                "Rejecting message from blacklisted peer: {}",
1485                propagation_source
1486            );
1487            if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1488                peer_score.reject_message(
1489                    propagation_source,
1490                    msg_id,
1491                    &raw_message.topic,
1492                    RejectReason::BlackListedPeer,
1493                );
1494                gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1495            }
1496            return false;
1497        }
1498
1499        // Also reject any message that originated from a blacklisted peer
1500        if let Some(source) = raw_message.source.as_ref() {
1501            if self.blacklisted_peers.contains(source) {
1502                debug!(
1503                    "Rejecting message from peer {} because of blacklisted source: {}",
1504                    propagation_source, source
1505                );
1506                if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1507                    peer_score.reject_message(
1508                        propagation_source,
1509                        msg_id,
1510                        &raw_message.topic,
1511                        RejectReason::BlackListedSource,
1512                    );
1513                    gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource);
1514                }
1515                return false;
1516            }
1517        }
1518
1519        // If we are not validating messages, assume this message is validated
1520        // This will allow the message to be gossiped without explicitly calling
1521        // `validate_message`.
1522        if !self.config.validate_messages() {
1523            raw_message.validated = true;
1524        }
1525
1526        // reject messages claiming to be from ourselves but not locally published
1527        let self_published = !self.config.allow_self_origin()
1528            && if let Some(own_id) = self.publish_config.get_own_id() {
1529                own_id != propagation_source
1530                    && raw_message.source.as_ref().map_or(false, |s| s == own_id)
1531            } else {
1532                self.published_message_ids.contains(&msg_id)
1533            };
1534
1535        if self_published {
1536            debug!(
1537                "Dropping message {} claiming to be from self but forwarded from {}",
1538                msg_id, propagation_source
1539            );
1540            if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
1541                peer_score.reject_message(
1542                    propagation_source,
1543                    msg_id,
1544                    &raw_message.topic,
1545                    RejectReason::SelfOrigin,
1546                );
1547                gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin);
1548            }
1549            return false;
1550        }
1551
1552        true
1553    }
1554
1555    /// Handles a newly received [`RawGossipsubMessage`].
1556    ///
1557    /// Forwards the message to all peers in the mesh.
1558    fn handle_received_message(
1559        &mut self,
1560        mut raw_message: RawGossipsubMessage,
1561        propagation_source: &PeerId,
1562    ) {
1563        let fast_message_id = self.config.fast_message_id(&raw_message);
1564        if let Some(fast_message_id) = fast_message_id.as_ref() {
1565            if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) {
1566                let msg_id = msg_id.clone();
1567                self.message_is_valid(&msg_id, &mut raw_message, propagation_source);
1568                if let Some((peer_score, ..)) = &mut self.peer_score {
1569                    peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic);
1570                }
1571                return;
1572            }
1573        }
1574
1575        // Try and perform the data transform to the message. If it fails, consider it invalid.
1576        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1577            Ok(message) => message,
1578            Err(e) => {
1579                debug!("Invalid message. Transform error: {:?}", e);
1580                // Reject the message and return
1581                self.handle_invalid_message(
1582                    propagation_source,
1583                    raw_message,
1584                    ValidationError::TransformFailed,
1585                );
1586                return;
1587            }
1588        };
1589
1590        // Calculate the message id on the transformed data.
1591        let msg_id = self.config.message_id(&message);
1592
1593        // Check the validity of the message
1594        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1595        // and instead continually penalize peers that repeatedly send this message.
1596        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1597            return;
1598        }
1599
1600        // Add the message to the duplicate caches
1601        if let Some(fast_message_id) = fast_message_id {
1602            // add id to cache
1603            self.fast_messsage_id_cache
1604                .entry(fast_message_id)
1605                .or_insert_with(|| msg_id.clone());
1606        }
1607        if !self.duplicate_cache.insert(msg_id.clone()) {
1608            debug!(
1609                "Message already received, ignoring. Message: {}",
1610                msg_id
1611            );
1612            if let Some((peer_score, ..)) = &mut self.peer_score {
1613                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1614            }
1615            return;
1616        }
1617        debug!(
1618            "Put message {:?} in duplicate_cache and resolve promises",
1619            msg_id
1620        );
1621
1622        // Tells score that message arrived (but is maybe not fully validated yet).
1623        // Consider the message as delivered for gossip promises.
1624        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1625            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1626            gossip_promises.message_delivered(&msg_id);
1627        }
1628
1629        // Add the message to our memcache
1630        self.mcache.put(&msg_id, raw_message.clone());
1631
1632        // Dispatch the message to the user if we are subscribed to any of the topics
1633        if self.mesh.contains_key(&message.topic) {
1634            debug!("Sending received message to user");
1635            self.events.push_back(NetworkBehaviourAction::GenerateEvent(
1636                GossipsubEvent::Message {
1637                    propagation_source: *propagation_source,
1638                    message_id: msg_id.clone(),
1639                    message,
1640                },
1641            ));
1642        } else {
1643            debug!(
1644                "Received message on a topic we are not subscribed to: {:?}",
1645                message.topic
1646            );
1647            return;
1648        }
1649
1650        // forward the message to mesh peers, if no validation is required
1651        if !self.config.validate_messages() {
1652            if self
1653                .forward_msg(&msg_id, raw_message, Some(propagation_source))
1654                .is_err()
1655            {
1656                error!("Failed to forward message. Too large");
1657            }
1658            debug!("Completed message handling for message: {:?}", msg_id);
1659        }
1660    }
1661
1662    // Handles invalid messages received.
1663    fn handle_invalid_message(
1664        &mut self,
1665        propagation_source: &PeerId,
1666        raw_message: RawGossipsubMessage,
1667        validation_error: ValidationError,
1668    ) {
1669        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1670            let reason = RejectReason::ValidationError(validation_error);
1671            let fast_message_id_cache = &self.fast_messsage_id_cache;
1672            if let Some(msg_id) = self
1673                .config
1674                .fast_message_id(&raw_message)
1675                .and_then(|id| fast_message_id_cache.get(&id))
1676            {
1677                peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason);
1678                gossip_promises.reject_message(msg_id, &reason);
1679            } else {
1680                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1681                // advertising this message via an IHAVE and it's invalid it will be double
1682                // penalized, one for sending us an invalid and again for breaking a promise.
1683                peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1684            }
1685        }
1686    }
1687
1688    /// Handles received subscriptions.
1689    fn handle_received_subscriptions(
1690        &mut self,
1691        subscriptions: &[GossipsubSubscription],
1692        propagation_source: &PeerId,
1693    ) {
1694        debug!(
1695            "Handling subscriptions: {:?}, from source: {}",
1696            subscriptions,
1697            propagation_source.to_string()
1698        );
1699
1700        let mut unsubscribed_peers = Vec::new();
1701
1702        let subscribed_topics = match self.peer_topics.get_mut(propagation_source) {
1703            Some(topics) => topics,
1704            None => {
1705                error!(
1706                    "Subscription by unknown peer: {}",
1707                    propagation_source.to_string()
1708                );
1709                return;
1710            }
1711        };
1712
1713        // Collect potential graft messages for the peer.
1714        let mut grafts = Vec::new();
1715
1716        // Notify the application about the subscription, after the grafts are sent.
1717        let mut application_event = Vec::new();
1718
1719        let filtered_topics = match self
1720            .subscription_filter
1721            .filter_incoming_subscriptions(subscriptions, subscribed_topics)
1722        {
1723            Ok(topics) => topics,
1724            Err(s) => {
1725                error!(
1726                    "Subscription filter error: {}; ignoring RPC from peer {}",
1727                    s,
1728                    propagation_source.to_string()
1729                );
1730                return;
1731            }
1732        };
1733
1734        for subscription in filtered_topics {
1735            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1736            let peer_list = self
1737                .topic_peers
1738                .entry(subscription.topic_hash.clone())
1739                .or_insert_with(Default::default);
1740
1741            match subscription.action {
1742                GossipsubSubscriptionAction::Subscribe => {
1743                    if peer_list.insert(*propagation_source) {
1744                        debug!(
1745                            "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
1746                            propagation_source.to_string(),
1747                            subscription.topic_hash
1748                        );
1749                    }
1750
1751                    // add to the peer_topics mapping
1752                    subscribed_topics.insert(subscription.topic_hash.clone());
1753
1754                    // if the mesh needs peers add the peer to the mesh
1755                    if !self.explicit_peers.contains(propagation_source)
1756                        && match self.peer_protocols.get(propagation_source) {
1757                            Some(PeerKind::Gossipsubv1_1) => true,
1758                            Some(PeerKind::Gossipsub) => true,
1759                            _ => false,
1760                        }
1761                        && !Self::score_below_threshold_from_scores(
1762                            &self.peer_score,
1763                            propagation_source,
1764                            |_| 0.0,
1765                        )
1766                        .0
1767                        && !self
1768                            .backoffs
1769                            .is_backoff_with_slack(&subscription.topic_hash, propagation_source)
1770                    {
1771                        if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) {
1772                            if peers.len() < self.config.mesh_n_low()
1773                                && peers.insert(*propagation_source)
1774                            {
1775                                debug!(
1776                                    "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}",
1777                                    propagation_source.to_string(),
1778                                    subscription.topic_hash
1779                                );
1780                                // send graft to the peer
1781                                debug!(
1782                                    "Sending GRAFT to peer {} for topic {:?}",
1783                                    propagation_source.to_string(),
1784                                    subscription.topic_hash
1785                                );
1786                                if let Some((peer_score, ..)) = &mut self.peer_score {
1787                                    peer_score
1788                                        .graft(propagation_source, subscription.topic_hash.clone());
1789                                }
1790                                grafts.push(GossipsubControlAction::Graft {
1791                                    topic_hash: subscription.topic_hash.clone(),
1792                                });
1793                            }
1794                        }
1795                    }
1796                    // generates a subscription event to be polled
1797                    application_event.push(NetworkBehaviourAction::GenerateEvent(
1798                        GossipsubEvent::Subscribed {
1799                            peer_id: *propagation_source,
1800                            topic: subscription.topic_hash.clone(),
1801                        },
1802                    ));
1803                }
1804                GossipsubSubscriptionAction::Unsubscribe => {
1805                    if peer_list.remove(propagation_source) {
1806                        info!(
1807                            "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
1808                            propagation_source.to_string(),
1809                            subscription.topic_hash
1810                        );
1811                    }
1812                    // remove topic from the peer_topics mapping
1813                    subscribed_topics.remove(&subscription.topic_hash);
1814                    unsubscribed_peers
1815                        .push((*propagation_source, subscription.topic_hash.clone()));
1816                    // generate an unsubscribe event to be polled
1817                    application_event.push(NetworkBehaviourAction::GenerateEvent(
1818                        GossipsubEvent::Unsubscribed {
1819                            peer_id: *propagation_source,
1820                            topic: subscription.topic_hash.clone(),
1821                        },
1822                    ));
1823                }
1824            }
1825        }
1826
1827        // remove unsubscribed peers from the mesh if it exists
1828        for (peer_id, topic_hash) in unsubscribed_peers {
1829            self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false);
1830        }
1831
1832        // If we need to send grafts to peer, do so immediately, rather than waiting for the
1833        // heartbeat.
1834        if !grafts.is_empty()
1835            && self
1836                .send_message(
1837                    *propagation_source,
1838                    GossipsubRpc {
1839                        subscriptions: Vec::new(),
1840                        messages: Vec::new(),
1841                        control_msgs: grafts,
1842                    }
1843                    .into_protobuf(),
1844                )
1845                .is_err()
1846        {
1847            error!("Failed sending grafts. Message too large");
1848        }
1849
1850        // Notify the application of the subscriptions
1851        for event in application_event {
1852            self.events.push_back(event);
1853        }
1854
1855        trace!(
1856            "Completed handling subscriptions from source: {:?}",
1857            propagation_source
1858        );
1859    }
1860
1861    /// Applies penalties to peers that did not respond to our IWANT requests.
1862    fn apply_iwant_penalties(&mut self) {
1863        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1864            for (peer, count) in gossip_promises.get_broken_promises() {
1865                peer_score.add_penalty(&peer, count);
1866            }
1867        }
1868    }
1869
1870    /// Heartbeat function which shifts the memcache and updates the mesh.
1871    fn heartbeat(&mut self) {
1872        debug!("Starting heartbeat");
1873
1874        self.heartbeat_ticks += 1;
1875
1876        let mut to_graft = HashMap::new();
1877        let mut to_prune = HashMap::new();
1878        let mut no_px = HashSet::new();
1879
1880        // clean up expired backoffs
1881        self.backoffs.heartbeat();
1882
1883        // clean up ihave counters
1884        self.count_sent_iwant.clear();
1885        self.count_received_ihave.clear();
1886
1887        // apply iwant penalties
1888        self.apply_iwant_penalties();
1889
1890        // check connections to explicit peers
1891        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
1892            for p in self.explicit_peers.clone() {
1893                self.check_explicit_peer_connection(&p);
1894            }
1895        }
1896
1897        // cache scores throughout the heartbeat
1898        let mut scores = HashMap::new();
1899        let peer_score = &self.peer_score;
1900        let mut score = |p: &PeerId| match peer_score {
1901            Some((peer_score, ..)) => *scores
1902                .entry(*p)
1903                .or_insert_with(|| peer_score.score(p)),
1904            _ => 0.0,
1905        };
1906
1907        // maintain the mesh for each topic
1908        for (topic_hash, peers) in self.mesh.iter_mut() {
1909            let explicit_peers = &self.explicit_peers;
1910            let backoffs = &self.backoffs;
1911            let topic_peers = &self.topic_peers;
1912            let outbound_peers = &self.outbound_peers;
1913
1914            // drop all peers with negative score, without PX
1915            // if there is at some point a stable retain method for BTreeSet the following can be
1916            // written more efficiently with retain.
1917            let to_remove: Vec<_> = peers
1918                .iter()
1919                .filter(|&p| {
1920                    if score(p) < 0.0 {
1921                        debug!(
1922                            "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
1923                             {}]",
1924                            p,
1925                            score(p),
1926                            topic_hash
1927                        );
1928
1929                        let current_topic = to_prune.entry(*p).or_insert_with(Vec::new);
1930                        current_topic.push(topic_hash.clone());
1931                        no_px.insert(*p);
1932                        true
1933                    } else {
1934                        false
1935                    }
1936                })
1937                .cloned()
1938                .collect();
1939            for peer in to_remove {
1940                peers.remove(&peer);
1941            }
1942
1943            // too little peers - add some
1944            if peers.len() < self.config.mesh_n_low() {
1945                debug!(
1946                    "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}",
1947                    topic_hash,
1948                    peers.len(),
1949                    self.config.mesh_n_low()
1950                );
1951                // not enough peers - get mesh_n - current_length more
1952                let desired_peers = self.config.mesh_n() - peers.len();
1953                let peer_list = get_random_peers(
1954                    topic_peers,
1955                    &self.peer_protocols,
1956                    topic_hash,
1957                    desired_peers,
1958                    |peer| {
1959                        !peers.contains(peer)
1960                            && !explicit_peers.contains(peer)
1961                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
1962                            && score(peer) >= 0.0
1963                    },
1964                );
1965                for peer in &peer_list {
1966                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
1967                    current_topic.push(topic_hash.clone());
1968                }
1969                // update the mesh
1970                debug!("Updating mesh, new mesh: {:?}", peer_list);
1971                peers.extend(peer_list);
1972            }
1973
1974            // too many peers - remove some
1975            if peers.len() > self.config.mesh_n_high() {
1976                debug!(
1977                    "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}",
1978                    topic_hash,
1979                    peers.len(),
1980                    self.config.mesh_n_high()
1981                );
1982                let excess_peer_no = peers.len() - self.config.mesh_n();
1983
1984                // shuffle the peers and then sort by score ascending beginning with the worst
1985                let mut rng = thread_rng();
1986                let mut shuffled = peers.iter().cloned().collect::<Vec<_>>();
1987                shuffled.shuffle(&mut rng);
1988                shuffled
1989                    .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal));
1990                // shuffle everything except the last retain_scores many peers (the best ones)
1991                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
1992
1993                // count total number of outbound peers
1994                let mut outbound = {
1995                    let outbound_peers = &self.outbound_peers;
1996                    shuffled
1997                        .iter()
1998                        .filter(|p| outbound_peers.contains(*p))
1999                        .count()
2000                };
2001
2002                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2003                // them to to_prune
2004                let mut removed = 0;
2005                for peer in shuffled {
2006                    if removed == excess_peer_no {
2007                        break;
2008                    }
2009                    if self.outbound_peers.contains(&peer) {
2010                        if outbound <= self.config.mesh_outbound_min() {
2011                            //do not remove anymore outbound peers
2012                            continue;
2013                        } else {
2014                            //an outbound peer gets removed
2015                            outbound -= 1;
2016                        }
2017                    }
2018
2019                    //remove the peer
2020                    peers.remove(&peer);
2021                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2022                    current_topic.push(topic_hash.clone());
2023                    removed += 1;
2024                }
2025            }
2026
2027            // do we have enough outbound peers?
2028            if peers.len() >= self.config.mesh_n_low() {
2029                // count number of outbound peers we have
2030                let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2031
2032                // if we have not enough outbound peers, graft to some new outbound peers
2033                if outbound < self.config.mesh_outbound_min() {
2034                    let needed = self.config.mesh_outbound_min() - outbound;
2035                    let peer_list = get_random_peers(
2036                        topic_peers,
2037                        &self.peer_protocols,
2038                        topic_hash,
2039                        needed,
2040                        |peer| {
2041                            !peers.contains(peer)
2042                                && !explicit_peers.contains(peer)
2043                                && !backoffs.is_backoff_with_slack(topic_hash, peer)
2044                                && score(peer) >= 0.0
2045                                && outbound_peers.contains(peer)
2046                        },
2047                    );
2048                    for peer in &peer_list {
2049                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2050                        current_topic.push(topic_hash.clone());
2051                    }
2052                    // update the mesh
2053                    debug!("Updating mesh, new mesh: {:?}", peer_list);
2054                    peers.extend(peer_list);
2055                }
2056            }
2057
2058            // should we try to improve the mesh with opportunistic grafting?
2059            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2060                && peers.len() > 1
2061                && self.peer_score.is_some()
2062            {
2063                if let Some((_, thresholds, _, _)) = &self.peer_score {
2064                    // Opportunistic grafting works as follows: we check the median score of peers
2065                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2066                    // select a few peers at random with score over the median.
2067                    // The intention is to (slowly) improve an underperforming mesh by introducing
2068                    // good scoring peers that may have been gossiping at us. This allows us to
2069                    // get out of sticky situations where we are stuck with poor peers and also
2070                    // recover from churn of good peers.
2071
2072                    // now compute the median peer score in the mesh
2073                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2074                    peers_by_score
2075                        .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal));
2076
2077                    let middle = peers_by_score.len() / 2;
2078                    let median = if peers_by_score.len() % 2 == 0 {
2079                        (score(
2080                            *peers_by_score.get(middle - 1).expect(
2081                                "middle < vector length and middle > 0 since peers.len() > 0",
2082                            ),
2083                        ) + score(*peers_by_score.get(middle).expect("middle < vector length")))
2084                            * 0.5
2085                    } else {
2086                        score(*peers_by_score.get(middle).expect("middle < vector length"))
2087                    };
2088
2089                    // if the median score is below the threshold, select a better peer (if any) and
2090                    // GRAFT
2091                    if median < thresholds.opportunistic_graft_threshold {
2092                        let peer_list = get_random_peers(
2093                            topic_peers,
2094                            &self.peer_protocols,
2095                            topic_hash,
2096                            self.config.opportunistic_graft_peers(),
2097                            |peer| {
2098                                !peers.contains(peer)
2099                                    && !explicit_peers.contains(peer)
2100                                    && !backoffs.is_backoff_with_slack(topic_hash, peer)
2101                                    && score(peer) > median
2102                            },
2103                        );
2104                        for peer in &peer_list {
2105                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2106                            current_topic.push(topic_hash.clone());
2107                        }
2108                        // update the mesh
2109                        debug!(
2110                            "Opportunistically graft in topic {} with peers {:?}",
2111                            topic_hash, peer_list
2112                        );
2113                        peers.extend(peer_list);
2114                    }
2115                }
2116            }
2117        }
2118
2119        // remove expired fanout topics
2120        {
2121            let fanout = &mut self.fanout; // help the borrow checker
2122            let fanout_ttl = self.config.fanout_ttl();
2123            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2124                if *last_pub_time + fanout_ttl < Instant::now() {
2125                    debug!(
2126                        "HEARTBEAT: Fanout topic removed due to timeout. Topic: {:?}",
2127                        topic_hash
2128                    );
2129                    fanout.remove(&topic_hash);
2130                    return false;
2131                }
2132                true
2133            });
2134        }
2135
2136        // maintain fanout
2137        // check if our peers are still a part of the topic
2138        for (topic_hash, peers) in self.fanout.iter_mut() {
2139            let mut to_remove_peers = Vec::new();
2140            let publish_threshold = match &self.peer_score {
2141                Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2142                _ => 0.0,
2143            };
2144            for peer in peers.iter() {
2145                // is the peer still subscribed to the topic?
2146                match self.peer_topics.get(peer) {
2147                    Some(topics) => {
2148                        if !topics.contains(&topic_hash) || score(peer) < publish_threshold {
2149                            debug!(
2150                                "HEARTBEAT: Peer removed from fanout for topic: {:?}",
2151                                topic_hash
2152                            );
2153                            to_remove_peers.push(*peer);
2154                        }
2155                    }
2156                    None => {
2157                        // remove if the peer has disconnected
2158                        to_remove_peers.push(*peer);
2159                    }
2160                }
2161            }
2162            for to_remove in to_remove_peers {
2163                peers.remove(&to_remove);
2164            }
2165
2166            // not enough peers
2167            if peers.len() < self.config.mesh_n() {
2168                debug!(
2169                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2170                    peers.len(),
2171                    self.config.mesh_n()
2172                );
2173                let needed_peers = self.config.mesh_n() - peers.len();
2174                let explicit_peers = &self.explicit_peers;
2175                let new_peers = get_random_peers(
2176                    &self.topic_peers,
2177                    &self.peer_protocols,
2178                    topic_hash,
2179                    needed_peers,
2180                    |peer| {
2181                        !peers.contains(peer)
2182                            && !explicit_peers.contains(peer)
2183                            && score(peer) < publish_threshold
2184                    },
2185                );
2186                peers.extend(new_peers);
2187            }
2188        }
2189
2190        if self.peer_score.is_some() {
2191            trace!("Peer_scores: {:?}", {
2192                for peer in self.peer_topics.keys() {
2193                    score(peer);
2194                }
2195                scores
2196            });
2197            trace!("Mesh message deliveries: {:?}", {
2198                self.mesh
2199                    .iter()
2200                    .map(|(t, peers)| {
2201                        (
2202                            t.clone(),
2203                            peers
2204                                .iter()
2205                                .map(|p| {
2206                                    (
2207                                        *p,
2208                                        peer_score
2209                                            .as_ref()
2210                                            .expect("peer_score.is_some()")
2211                                            .0
2212                                            .mesh_message_deliveries(p, t)
2213                                            .unwrap_or(0.0),
2214                                    )
2215                                })
2216                                .collect::<HashMap<PeerId, f64>>(),
2217                        )
2218                    })
2219                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2220            })
2221        }
2222
2223        self.emit_gossip();
2224
2225        // send graft/prunes
2226        if !to_graft.is_empty() | !to_prune.is_empty() {
2227            self.send_graft_prune(to_graft, to_prune, no_px);
2228        }
2229
2230        // piggyback pooled control messages
2231        self.flush_control_pool();
2232
2233        // shift the memcache
2234        self.mcache.shift();
2235
2236        debug!("Completed Heartbeat");
2237    }
2238
2239    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2240    /// and fanout peers
2241    fn emit_gossip(&mut self) {
2242        let mut rng = thread_rng();
2243        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2244            let mut message_ids = self.mcache.get_gossip_message_ids(&topic_hash);
2245            if message_ids.is_empty() {
2246                return;
2247            }
2248
2249            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2250            if message_ids.len() > self.config.max_ihave_length() {
2251                // we do the truncation (with shuffling) per peer below
2252                debug!(
2253                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2254                    message_ids.len()
2255                );
2256            } else {
2257                // shuffle to emit in random order
2258                message_ids.shuffle(&mut rng);
2259            }
2260
2261            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2262            let n_map = |m| {
2263                max(
2264                    self.config.gossip_lazy(),
2265                    (self.config.gossip_factor() * m as f64) as usize,
2266                )
2267            };
2268            // get gossip_lazy random peers
2269            let to_msg_peers = get_random_peers_dynamic(
2270                &self.topic_peers,
2271                &self.peer_protocols,
2272                &topic_hash,
2273                n_map,
2274                |peer| {
2275                    !peers.contains(peer)
2276                        && !self.explicit_peers.contains(peer)
2277                        && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2278                },
2279            );
2280
2281            debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());
2282
2283            for peer in to_msg_peers {
2284                let mut peer_message_ids = message_ids.clone();
2285
2286                if peer_message_ids.len() > self.config.max_ihave_length() {
2287                    // We do this per peer so that we emit a different set for each peer.
2288                    // we have enough redundancy in the system that this will significantly increase
2289                    // the message coverage when we do truncate.
2290                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2291                    peer_message_ids.truncate(self.config.max_ihave_length());
2292                }
2293
2294                // send an IHAVE message
2295                Self::control_pool_add(
2296                    &mut self.control_pool,
2297                    peer,
2298                    GossipsubControlAction::IHave {
2299                        topic_hash: topic_hash.clone(),
2300                        message_ids: peer_message_ids,
2301                    },
2302                );
2303            }
2304        }
2305    }
2306
2307    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2308    /// messages.
2309    fn send_graft_prune(
2310        &mut self,
2311        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2312        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2313        no_px: HashSet<PeerId>,
2314    ) {
2315        // handle the grafts and overlapping prunes per peer
2316        for (peer, topics) in to_graft.iter() {
2317            for topic in topics {
2318                //inform scoring of graft
2319                if let Some((peer_score, ..)) = &mut self.peer_score {
2320                    peer_score.graft(peer, topic.clone());
2321                }
2322            }
2323            let mut control_msgs: Vec<GossipsubControlAction> = topics
2324                .iter()
2325                .map(|topic_hash| GossipsubControlAction::Graft {
2326                    topic_hash: topic_hash.clone(),
2327                })
2328                .collect();
2329
2330            // If there are prunes associated with the same peer add them.
2331            if let Some(topics) = to_prune.remove(peer) {
2332                let mut prunes = topics
2333                    .iter()
2334                    .map(|topic_hash| {
2335                        self.make_prune(
2336                            topic_hash,
2337                            peer,
2338                            self.config.do_px() && !no_px.contains(peer),
2339                        )
2340                    })
2341                    .collect::<Vec<_>>();
2342                control_msgs.append(&mut prunes);
2343            }
2344
2345            // send the control messages
2346            if self
2347                .send_message(
2348                    *peer,
2349                    GossipsubRpc {
2350                        subscriptions: Vec::new(),
2351                        messages: Vec::new(),
2352                        control_msgs,
2353                    }
2354                    .into_protobuf(),
2355                )
2356                .is_err()
2357            {
2358                error!("Failed to send control messages. Message too large");
2359            }
2360        }
2361
2362        // handle the remaining prunes
2363        for (peer, topics) in to_prune.iter() {
2364            let remaining_prunes = topics
2365                .iter()
2366                .map(|topic_hash| {
2367                    self.make_prune(
2368                        topic_hash,
2369                        peer,
2370                        self.config.do_px() && !no_px.contains(peer),
2371                    )
2372                })
2373                .collect();
2374            if self
2375                .send_message(
2376                    *peer,
2377                    GossipsubRpc {
2378                        subscriptions: Vec::new(),
2379                        messages: Vec::new(),
2380                        control_msgs: remaining_prunes,
2381                    }
2382                    .into_protobuf(),
2383                )
2384                .is_err()
2385            {
2386                error!("Failed to send prune messages. Message too large");
2387            }
2388        }
2389    }
2390
2391    /// Helper function which forwards a message to mesh\[topic\] peers.
2392    ///
2393    /// Returns true if at least one peer was messaged.
2394    fn forward_msg(
2395        &mut self,
2396        msg_id: &MessageId,
2397        message: RawGossipsubMessage,
2398        propagation_source: Option<&PeerId>,
2399    ) -> Result<bool, PublishError> {
2400        // message is fully validated inform peer_score
2401        if let Some((peer_score, ..)) = &mut self.peer_score {
2402            if let Some(peer) = propagation_source {
2403                peer_score.deliver_message(peer, msg_id, &message.topic);
2404            }
2405        }
2406
2407        debug!("Forwarding message: {:?}", msg_id);
2408        let mut recipient_peers = HashSet::new();
2409
2410        // add mesh peers
2411        let topic = &message.topic;
2412        // mesh
2413        if let Some(mesh_peers) = self.mesh.get(&topic) {
2414            for peer_id in mesh_peers {
2415                if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
2416                    recipient_peers.insert(*peer_id);
2417                }
2418            }
2419        }
2420
2421        // Add explicit peers
2422        for p in &self.explicit_peers {
2423            if let Some(topics) = self.peer_topics.get(p) {
2424                if Some(p) != propagation_source
2425                    && Some(p) != message.source.as_ref()
2426                    && topics.contains(&message.topic)
2427                {
2428                    recipient_peers.insert(*p);
2429                }
2430            }
2431        }
2432
2433        // forward the message to peers
2434        if !recipient_peers.is_empty() {
2435            let event = Arc::new(
2436                GossipsubRpc {
2437                    subscriptions: Vec::new(),
2438                    messages: vec![message.clone()],
2439                    control_msgs: Vec::new(),
2440                }
2441                .into_protobuf(),
2442            );
2443
2444            for peer in recipient_peers.iter() {
2445                debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
2446                self.send_message(*peer, event.clone())?;
2447            }
2448            debug!("Completed forwarding message");
2449            Ok(true)
2450        } else {
2451            Ok(false)
2452        }
2453    }
2454
2455    /// Constructs a [`RawGossipsubMessage`] performing message signing if required.
2456    pub(crate) fn build_raw_message(
2457        &self,
2458        topic: TopicHash,
2459        data: Vec<u8>,
2460    ) -> Result<RawGossipsubMessage, PublishError> {
2461        match &self.publish_config {
2462            PublishConfig::Signing {
2463                ref keypair,
2464                author,
2465                inline_key,
2466            } => {
2467                // Build and sign the message
2468                let sequence_number: u64 = rand::random();
2469
2470                let signature = {
2471                    let message = rpc_proto::Message {
2472                        from: Some(author.clone().to_bytes()),
2473                        data: Some(data.clone()),
2474                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2475                        topic: topic.clone().into_string(),
2476                        signature: None,
2477                        key: None,
2478                    };
2479
2480                    let mut buf = Vec::with_capacity(message.encoded_len());
2481                    message
2482                        .encode(&mut buf)
2483                        .expect("Buffer has sufficient capacity");
2484
2485                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2486                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2487                    signature_bytes.extend_from_slice(&buf);
2488                    Some(keypair.sign(&signature_bytes)?)
2489                };
2490
2491                Ok(RawGossipsubMessage {
2492                    source: Some(*author),
2493                    data,
2494                    // To be interoperable with the go-implementation this is treated as a 64-bit
2495                    // big-endian uint.
2496                    sequence_number: Some(sequence_number),
2497                    topic,
2498                    signature,
2499                    key: inline_key.clone(),
2500                    validated: true, // all published messages are valid
2501                })
2502            }
2503            PublishConfig::Author(peer_id) => {
2504                Ok(RawGossipsubMessage {
2505                    source: Some(*peer_id),
2506                    data,
2507                    // To be interoperable with the go-implementation this is treated as a 64-bit
2508                    // big-endian uint.
2509                    sequence_number: Some(rand::random()),
2510                    topic,
2511                    signature: None,
2512                    key: None,
2513                    validated: true, // all published messages are valid
2514                })
2515            }
2516            PublishConfig::RandomAuthor => {
2517                Ok(RawGossipsubMessage {
2518                    source: Some(PeerId::random()),
2519                    data,
2520                    // To be interoperable with the go-implementation this is treated as a 64-bit
2521                    // big-endian uint.
2522                    sequence_number: Some(rand::random()),
2523                    topic,
2524                    signature: None,
2525                    key: None,
2526                    validated: true, // all published messages are valid
2527                })
2528            }
2529            PublishConfig::Anonymous => {
2530                Ok(RawGossipsubMessage {
2531                    source: None,
2532                    data,
2533                    // To be interoperable with the go-implementation this is treated as a 64-bit
2534                    // big-endian uint.
2535                    sequence_number: None,
2536                    topic,
2537                    signature: None,
2538                    key: None,
2539                    validated: true, // all published messages are valid
2540                })
2541            }
2542        }
2543    }
2544
2545    // adds a control action to control_pool
2546    fn control_pool_add(
2547        control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
2548        peer: PeerId,
2549        control: GossipsubControlAction,
2550    ) {
2551        control_pool
2552            .entry(peer)
2553            .or_insert_with(Vec::new)
2554            .push(control);
2555    }
2556
2557    /// Takes each control action mapping and turns it into a message
2558    fn flush_control_pool(&mut self) {
2559        for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
2560            if self
2561                .send_message(
2562                    peer,
2563                    GossipsubRpc {
2564                        subscriptions: Vec::new(),
2565                        messages: Vec::new(),
2566                        control_msgs: controls,
2567                    }
2568                    .into_protobuf(),
2569                )
2570                .is_err()
2571            {
2572                error!("Failed to flush control pool. Message too large");
2573            }
2574        }
2575    }
2576
2577    /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it
2578    /// is not already an arc.
2579    fn send_message(
2580        &mut self,
2581        peer_id: PeerId,
2582        message: impl Into<Arc<rpc_proto::Rpc>>,
2583    ) -> Result<(), PublishError> {
2584        // If the message is oversized, try and fragment it. If it cannot be fragmented, log an
2585        // error and drop the message (all individual messages should be small enough to fit in the
2586        // max_transmit_size)
2587
2588        let messages = self.fragment_message(message.into())?;
2589
2590        for message in messages {
2591            self.events
2592                .push_back(NetworkBehaviourAction::NotifyHandler {
2593                    peer_id,
2594                    event: message,
2595                    handler: NotifyHandler::Any,
2596                })
2597        }
2598        Ok(())
2599    }
2600
2601    // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC
2602    // messages to be sent.
2603    fn fragment_message(
2604        &self,
2605        rpc: Arc<rpc_proto::Rpc>,
2606    ) -> Result<Vec<Arc<rpc_proto::Rpc>>, PublishError> {
2607        if rpc.encoded_len() < self.config.max_transmit_size() {
2608            return Ok(vec![rpc]);
2609        }
2610
2611        let new_rpc = rpc_proto::Rpc {
2612            subscriptions: Vec::new(),
2613            publish: Vec::new(),
2614            control: None,
2615        };
2616
2617        let mut rpc_list = vec![new_rpc.clone()];
2618
2619        // Gets an RPC if the object size will fit, otherwise create a new RPC. The last element
2620        // will be the RPC to add an object.
2621        macro_rules! create_or_add_rpc {
2622            ($object_size: ident ) => {
2623                let list_index = rpc_list.len() - 1; // the list is never empty
2624
2625                // create a new RPC if the new object plus 5% of its size (for length prefix
2626                // buffers) exceeds the max transmit size.
2627                if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
2628                    > self.config.max_transmit_size()
2629                    && rpc_list[list_index] != new_rpc
2630                {
2631                    // create a new rpc and use this as the current
2632                    rpc_list.push(new_rpc.clone());
2633                }
2634            };
2635        };
2636
2637        macro_rules! add_item {
2638            ($object: ident, $type: ident ) => {
2639                let object_size = $object.encoded_len();
2640
2641                if object_size + 2 > self.config.max_transmit_size() {
2642                    // This should not be possible. All received and published messages have already
2643                    // been vetted to fit within the size.
2644                    error!("Individual message too large to fragment");
2645                    return Err(PublishError::MessageTooLarge);
2646                }
2647
2648                create_or_add_rpc!(object_size);
2649                rpc_list
2650                    .last_mut()
2651                    .expect("Must have at least one element")
2652                    .$type
2653                    .push($object.clone());
2654            };
2655        }
2656
2657        // Add messages until the limit
2658        for message in &rpc.publish {
2659            add_item!(message, publish);
2660        }
2661        for subscription in &rpc.subscriptions {
2662            add_item!(subscription, subscriptions);
2663        }
2664
2665        // handle the control messages. If all are within the max_transmit_size, send them without
2666        // fragmenting, otherwise, fragment the control messages
2667        let empty_control = rpc_proto::ControlMessage::default();
2668        if let Some(control) = rpc.control.as_ref() {
2669            if control.encoded_len() + 2 > self.config.max_transmit_size() {
2670                // fragment the RPC
2671                for ihave in &control.ihave {
2672                    let len = ihave.encoded_len();
2673                    create_or_add_rpc!(len);
2674                    rpc_list
2675                        .last_mut()
2676                        .expect("Always an element")
2677                        .control
2678                        .get_or_insert_with(|| empty_control.clone())
2679                        .ihave
2680                        .push(ihave.clone());
2681                }
2682                for iwant in &control.iwant {
2683                    let len = iwant.encoded_len();
2684                    create_or_add_rpc!(len);
2685                    rpc_list
2686                        .last_mut()
2687                        .expect("Always an element")
2688                        .control
2689                        .get_or_insert_with(|| empty_control.clone())
2690                        .iwant
2691                        .push(iwant.clone());
2692                }
2693                for graft in &control.graft {
2694                    let len = graft.encoded_len();
2695                    create_or_add_rpc!(len);
2696                    rpc_list
2697                        .last_mut()
2698                        .expect("Always an element")
2699                        .control
2700                        .get_or_insert_with(|| empty_control.clone())
2701                        .graft
2702                        .push(graft.clone());
2703                }
2704                for prune in &control.prune {
2705                    let len = prune.encoded_len();
2706                    create_or_add_rpc!(len);
2707                    rpc_list
2708                        .last_mut()
2709                        .expect("Always an element")
2710                        .control
2711                        .get_or_insert_with(|| empty_control.clone())
2712                        .prune
2713                        .push(prune.clone());
2714                }
2715            } else {
2716                let len = control.encoded_len();
2717                create_or_add_rpc!(len);
2718                rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
2719            }
2720        }
2721
2722        Ok(rpc_list.into_iter().map(Arc::new).collect())
2723    }
2724}
2725
2726fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2727    addr.iter().find_map(|p| match p {
2728        Ip4(addr) => Some(IpAddr::V4(addr)),
2729        Ip6(addr) => Some(IpAddr::V6(addr)),
2730        _ => None,
2731    })
2732}
2733
2734impl<C, F> NetworkBehaviour for Gossipsub<C, F>
2735where
2736    C: Send + 'static + DataTransform,
2737    F: Send + 'static + TopicSubscriptionFilter,
2738{
2739    type ProtocolsHandler = GossipsubHandler;
2740    type OutEvent = GossipsubEvent;
2741
2742    fn new_handler(&mut self) -> Self::ProtocolsHandler {
2743        GossipsubHandler::new(
2744            self.config.protocol_id_prefix().clone(),
2745            self.config.max_transmit_size(),
2746            self.config.validation_mode().clone(),
2747            self.config.support_floodsub(),
2748        )
2749    }
2750
2751    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
2752        Vec::new()
2753    }
2754
2755    fn inject_connected(&mut self, peer_id: &PeerId) {
2756        // Ignore connections from blacklisted peers.
2757        if self.blacklisted_peers.contains(peer_id) {
2758            debug!("Ignoring connection from blacklisted peer: {}", peer_id);
2759            return;
2760        }
2761
2762        info!("New peer connected: {}", peer_id);
2763        // We need to send our subscriptions to the newly-connected node.
2764        let mut subscriptions = vec![];
2765        for topic_hash in self.mesh.keys() {
2766            subscriptions.push(GossipsubSubscription {
2767                topic_hash: topic_hash.clone(),
2768                action: GossipsubSubscriptionAction::Subscribe,
2769            });
2770        }
2771
2772        if !subscriptions.is_empty() {
2773            // send our subscriptions to the peer
2774            if self
2775                .send_message(
2776                    *peer_id,
2777                    GossipsubRpc {
2778                        messages: Vec::new(),
2779                        subscriptions,
2780                        control_msgs: Vec::new(),
2781                    }
2782                    .into_protobuf(),
2783                )
2784                .is_err()
2785            {
2786                error!("Failed to send subscriptions, message too large");
2787            }
2788        }
2789
2790        // Insert an empty set of the topics of this peer until known.
2791        self.peer_topics.insert(*peer_id, Default::default());
2792
2793        // By default we assume a peer is only a floodsub peer.
2794        //
2795        // The protocol negotiation occurs once a message is sent/received. Once this happens we
2796        // update the type of peer that this is in order to determine which kind of routing should
2797        // occur.
2798        self.peer_protocols
2799            .entry(*peer_id)
2800            .or_insert(PeerKind::Floodsub);
2801
2802        if let Some((peer_score, ..)) = &mut self.peer_score {
2803            peer_score.add_peer(*peer_id);
2804        }
2805    }
2806
2807    fn inject_disconnected(&mut self, peer_id: &PeerId) {
2808        // remove from mesh, topic_peers, peer_topic and the fanout
2809        debug!("Peer disconnected: {}", peer_id);
2810        {
2811            let topics = match self.peer_topics.get(peer_id) {
2812                Some(topics) => (topics),
2813                None => {
2814                    if !self.blacklisted_peers.contains(peer_id) {
2815                        debug!("Disconnected node, not in connected nodes");
2816                    }
2817                    return;
2818                }
2819            };
2820
2821            // remove peer from all mappings
2822            for topic in topics {
2823                // check the mesh for the topic
2824                if let Some(mesh_peers) = self.mesh.get_mut(&topic) {
2825                    // check if the peer is in the mesh and remove it
2826                    mesh_peers.remove(peer_id);
2827                }
2828
2829                // remove from topic_peers
2830                if let Some(peer_list) = self.topic_peers.get_mut(&topic) {
2831                    if !peer_list.remove(peer_id) {
2832                        // debugging purposes
2833                        warn!(
2834                            "Disconnected node: {} not in topic_peers peer list",
2835                            peer_id
2836                        );
2837                    }
2838                } else {
2839                    warn!(
2840                        "Disconnected node: {} with topic: {:?} not in topic_peers",
2841                        &peer_id, &topic
2842                    );
2843                }
2844
2845                // remove from fanout
2846                self.fanout
2847                    .get_mut(&topic)
2848                    .map(|peers| peers.remove(peer_id));
2849            }
2850
2851            //forget px and outbound status for this peer
2852            self.px_peers.remove(peer_id);
2853            self.outbound_peers.remove(peer_id);
2854        }
2855
2856        // Remove peer from peer_topics and peer_protocols
2857        // NOTE: It is possible the peer has already been removed from all mappings if it does not
2858        // support the protocol.
2859        self.peer_topics.remove(peer_id);
2860        self.peer_protocols.remove(peer_id);
2861
2862        if let Some((peer_score, ..)) = &mut self.peer_score {
2863            peer_score.remove_peer(peer_id);
2864        }
2865    }
2866
2867    fn inject_connection_established(
2868        &mut self,
2869        peer_id: &PeerId,
2870        _: &ConnectionId,
2871        endpoint: &ConnectedPoint,
2872    ) {
2873        // Ignore connections from blacklisted peers.
2874        if self.blacklisted_peers.contains(peer_id) {
2875            return;
2876        }
2877
2878        // Check if the peer is an outbound peer
2879        if let ConnectedPoint::Dialer { .. } = endpoint {
2880            // Diverging from the go implementation we only want to consider a peer as outbound peer
2881            // if its first connection is outbound. To check if this connection is the first we
2882            // check if the peer isn't connected yet. This only works because the
2883            // `inject_connection_established` event for the first connection gets called immediately
2884            // before `inject_connected` gets called.
2885            if !self.peer_topics.contains_key(peer_id) && !self.px_peers.contains(peer_id) {
2886                // The first connection is outbound and it is not a peer from peer exchange => mark
2887                // it as outbound peer
2888                self.outbound_peers.insert(*peer_id);
2889            }
2890        }
2891
2892        // Add the IP to the peer scoring system
2893        if let Some((peer_score, ..)) = &mut self.peer_score {
2894            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2895                peer_score.add_ip(&peer_id, ip);
2896            } else {
2897                trace!(
2898                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2899                    peer_id,
2900                    endpoint
2901                )
2902            }
2903        }
2904    }
2905
2906    fn inject_connection_closed(
2907        &mut self,
2908        peer: &PeerId,
2909        _: &ConnectionId,
2910        endpoint: &ConnectedPoint,
2911    ) {
2912        // Remove IP from peer scoring system
2913        if let Some((peer_score, ..)) = &mut self.peer_score {
2914            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2915                peer_score.remove_ip(peer, &ip);
2916            } else {
2917                trace!(
2918                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2919                    peer,
2920                    endpoint
2921                )
2922            }
2923        }
2924    }
2925
2926    fn inject_address_change(
2927        &mut self,
2928        peer: &PeerId,
2929        _: &ConnectionId,
2930        endpoint_old: &ConnectedPoint,
2931        endpoint_new: &ConnectedPoint,
2932    ) {
2933        // Exchange IP in peer scoring system
2934        if let Some((peer_score, ..)) = &mut self.peer_score {
2935            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
2936                peer_score.remove_ip(peer, &ip);
2937            } else {
2938                trace!(
2939                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2940                    peer,
2941                    endpoint_old
2942                )
2943            }
2944            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
2945                peer_score.add_ip(&peer, ip);
2946            } else {
2947                trace!(
2948                    "Couldn't extract ip from endpoint of peer {} with endpoint {:?}",
2949                    peer,
2950                    endpoint_new
2951                )
2952            }
2953        }
2954    }
2955
2956    fn inject_event(
2957        &mut self,
2958        propagation_source: PeerId,
2959        _: ConnectionId,
2960        handler_event: HandlerEvent,
2961    ) {
2962        match handler_event {
2963            HandlerEvent::PeerKind(kind) => {
2964                // We have identified the protocol this peer is using
2965                if let PeerKind::NotSupported = kind {
2966                    debug!(
2967                        "Peer does not support gossipsub protocols. {}",
2968                        propagation_source
2969                    );
2970                    // We treat this peer as disconnected
2971                    self.inject_disconnected(&propagation_source);
2972                } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) {
2973                    // Only change the value if the old value is Floodsub (the default set in
2974                    // inject_connected). All other PeerKind changes are ignored.
2975                    debug!(
2976                        "New peer type found: {} for peer: {}",
2977                        kind, propagation_source
2978                    );
2979                    if let PeerKind::Floodsub = *old_kind {
2980                        *old_kind = kind;
2981                    }
2982                }
2983            }
2984            HandlerEvent::Message {
2985                rpc,
2986                invalid_messages,
2987            } => {
2988                // Handle the gossipsub RPC
2989
2990                // Handle subscriptions
2991                // Update connected peers topics
2992                if !rpc.subscriptions.is_empty() {
2993                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
2994                }
2995
2996                // Check if peer is graylisted in which case we ignore the event
2997                if let (true, _) =
2998                    self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
2999                {
3000                    debug!("RPC Dropped from greylisted peer {}", propagation_source);
3001                    return;
3002                }
3003
3004                // Handle any invalid messages from this peer
3005                if self.peer_score.is_some() {
3006                    for (raw_message, validation_error) in invalid_messages {
3007                        self.handle_invalid_message(
3008                            &propagation_source,
3009                            raw_message,
3010                            validation_error,
3011                        )
3012                    }
3013                } else {
3014                    // log the invalid messages
3015                    for (message, validation_error) in invalid_messages {
3016                        warn!(
3017                            "Invalid message. Reason: {:?} propagation_peer {} source {:?}",
3018                            validation_error,
3019                            propagation_source.to_string(),
3020                            message.source
3021                        );
3022                    }
3023                }
3024
3025                // Handle messages
3026                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3027                    // Only process the amount of messages the configuration allows.
3028                    if self.config.max_messages_per_rpc().is_some()
3029                        && Some(count) >= self.config.max_messages_per_rpc()
3030                    {
3031                        warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3032                        break;
3033                    }
3034                    self.handle_received_message(raw_message, &propagation_source);
3035                }
3036
3037                // Handle control messages
3038                // group some control messages, this minimises SendEvents (code is simplified to handle each event at a time however)
3039                let mut ihave_msgs = vec![];
3040                let mut graft_msgs = vec![];
3041                let mut prune_msgs = vec![];
3042                for control_msg in rpc.control_msgs {
3043                    match control_msg {
3044                        GossipsubControlAction::IHave {
3045                            topic_hash,
3046                            message_ids,
3047                        } => {
3048                            ihave_msgs.push((topic_hash, message_ids));
3049                        }
3050                        GossipsubControlAction::IWant { message_ids } => {
3051                            self.handle_iwant(&propagation_source, message_ids)
3052                        }
3053                        GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
3054                        GossipsubControlAction::Prune {
3055                            topic_hash,
3056                            peers,
3057                            backoff,
3058                        } => prune_msgs.push((topic_hash, peers, backoff)),
3059                    }
3060                }
3061                if !ihave_msgs.is_empty() {
3062                    self.handle_ihave(&propagation_source, ihave_msgs);
3063                }
3064                if !graft_msgs.is_empty() {
3065                    self.handle_graft(&propagation_source, graft_msgs);
3066                }
3067                if !prune_msgs.is_empty() {
3068                    self.handle_prune(&propagation_source, prune_msgs);
3069                }
3070            }
3071        }
3072    }
3073
3074    fn poll(
3075        &mut self,
3076        cx: &mut Context<'_>,
3077        _: &mut impl PollParameters,
3078    ) -> Poll<
3079        NetworkBehaviourAction<
3080            <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
3081            Self::OutEvent,
3082        >,
3083    > {
3084        if let Some(event) = self.events.pop_front() {
3085            return Poll::Ready(match event {
3086                NetworkBehaviourAction::NotifyHandler {
3087                    peer_id,
3088                    handler,
3089                    event: send_event,
3090                } => {
3091                    // clone send event reference if others references are present
3092                    let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
3093                    NetworkBehaviourAction::NotifyHandler {
3094                        peer_id,
3095                        event,
3096                        handler,
3097                    }
3098                }
3099                NetworkBehaviourAction::GenerateEvent(e) => {
3100                    NetworkBehaviourAction::GenerateEvent(e)
3101                }
3102                NetworkBehaviourAction::DialAddress { address } => {
3103                    NetworkBehaviourAction::DialAddress { address }
3104                }
3105                NetworkBehaviourAction::DialPeer { peer_id, condition } => {
3106                    NetworkBehaviourAction::DialPeer { peer_id, condition }
3107                }
3108                NetworkBehaviourAction::ReportObservedAddr { address, score } => {
3109                    NetworkBehaviourAction::ReportObservedAddr { address, score }
3110                }
3111            });
3112        }
3113
3114        // update scores
3115        if let Some((peer_score, _, interval, _)) = &mut self.peer_score {
3116            while let Poll::Ready(Some(())) = interval.poll_next_unpin(cx) {
3117                peer_score.refresh_scores();
3118            }
3119        }
3120
3121        while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) {
3122            self.heartbeat();
3123        }
3124
3125        Poll::Pending
3126    }
3127}
3128
3129/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3130/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3131/// that gets as input the number of filtered peers.
3132fn get_random_peers_dynamic(
3133    topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3134    peer_protocols: &HashMap<PeerId, PeerKind>,
3135    topic_hash: &TopicHash,
3136    // maps the number of total peers to the number of selected peers
3137    n_map: impl Fn(usize) -> usize,
3138    mut f: impl FnMut(&PeerId) -> bool,
3139) -> BTreeSet<PeerId> {
3140    let mut gossip_peers = match topic_peers.get(topic_hash) {
3141        // if they exist, filter the peers by `f`
3142        Some(peer_list) => peer_list
3143            .iter()
3144            .cloned()
3145            .filter(|p| {
3146                f(p) && match peer_protocols.get(p) {
3147                    Some(PeerKind::Gossipsub) => true,
3148                    Some(PeerKind::Gossipsubv1_1) => true,
3149                    _ => false,
3150                }
3151            })
3152            .collect(),
3153        None => Vec::new(),
3154    };
3155
3156    // if we have less than needed, return them
3157    let n = n_map(gossip_peers.len());
3158    if gossip_peers.len() <= n {
3159        debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3160        return gossip_peers.into_iter().collect();
3161    }
3162
3163    // we have more peers than needed, shuffle them and return n of them
3164    let mut rng = thread_rng();
3165    gossip_peers.partial_shuffle(&mut rng, n);
3166
3167    debug!("RANDOM PEERS: Got {:?} peers", n);
3168
3169    gossip_peers.into_iter().take(n).collect()
3170}
3171
3172/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3173/// filtered by the function `f`.
3174fn get_random_peers(
3175    topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
3176    peer_protocols: &HashMap<PeerId, PeerKind>,
3177    topic_hash: &TopicHash,
3178    n: usize,
3179    f: impl FnMut(&PeerId) -> bool,
3180) -> BTreeSet<PeerId> {
3181    get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f)
3182}
3183
3184/// Validates the combination of signing, privacy and message validation to ensure the
3185/// configuration will not reject published messages.
3186fn validate_config(
3187    authenticity: &MessageAuthenticity,
3188    validation_mode: &ValidationMode,
3189) -> Result<(), &'static str> {
3190    match validation_mode {
3191        ValidationMode::Anonymous => {
3192            if authenticity.is_signing() {
3193                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3194            }
3195
3196            if !authenticity.is_anonymous() {
3197                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3198            }
3199        }
3200        ValidationMode::Strict => {
3201            if !authenticity.is_signing() {
3202                return Err(
3203                    "Messages will be
3204                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3205                the validation or privacy settings in the config"
3206                );
3207            }
3208        }
3209        _ => {}
3210    }
3211    Ok(())
3212}
3213
3214impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
3215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3216        f.debug_struct("Gossipsub")
3217            .field("config", &self.config)
3218            .field("events", &self.events)
3219            .field("control_pool", &self.control_pool)
3220            .field("publish_config", &self.publish_config)
3221            .field("topic_peers", &self.topic_peers)
3222            .field("peer_topics", &self.peer_topics)
3223            .field("mesh", &self.mesh)
3224            .field("fanout", &self.fanout)
3225            .field("fanout_last_pub", &self.fanout_last_pub)
3226            .field("mcache", &self.mcache)
3227            .field("heartbeat", &self.heartbeat)
3228            .finish()
3229    }
3230}
3231
3232impl fmt::Debug for PublishConfig {
3233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3234        match self {
3235            PublishConfig::Signing { author, .. } => {
3236                f.write_fmt(format_args!("PublishConfig::Signing({})", author))
3237            }
3238            PublishConfig::Author(author) => {
3239                f.write_fmt(format_args!("PublishConfig::Author({})", author))
3240            }
3241            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3242            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3243        }
3244    }
3245}
3246
3247#[cfg(test)]
3248mod local_test {
3249    use super::*;
3250    use crate::IdentTopic;
3251    use asynchronous_codec::Encoder;
3252    use quickcheck::*;
3253    use rand::Rng;
3254
3255    fn empty_rpc() -> GossipsubRpc {
3256        GossipsubRpc {
3257            subscriptions: Vec::new(),
3258            messages: Vec::new(),
3259            control_msgs: Vec::new(),
3260        }
3261    }
3262
3263    fn test_message() -> RawGossipsubMessage {
3264        RawGossipsubMessage {
3265            source: Some(PeerId::random()),
3266            data: vec![0; 100],
3267            sequence_number: None,
3268            topic: TopicHash::from_raw("test_topic"),
3269            signature: None,
3270            key: None,
3271            validated: false,
3272        }
3273    }
3274
3275    fn test_subscription() -> GossipsubSubscription {
3276        GossipsubSubscription {
3277            action: GossipsubSubscriptionAction::Subscribe,
3278            topic_hash: IdentTopic::new("TestTopic").hash(),
3279        }
3280    }
3281
3282    fn test_control() -> GossipsubControlAction {
3283        GossipsubControlAction::IHave {
3284            topic_hash: IdentTopic::new("TestTopic").hash(),
3285            message_ids: vec![MessageId(vec![12u8]); 5],
3286        }
3287    }
3288
3289    impl Arbitrary for GossipsubRpc {
3290        fn arbitrary<G: Gen>(g: &mut G) -> Self {
3291            let mut rpc = empty_rpc();
3292
3293            for _ in 0..g.gen_range(0, 10) {
3294                rpc.subscriptions.push(test_subscription());
3295            }
3296            for _ in 0..g.gen_range(0, 10) {
3297                rpc.messages.push(test_message());
3298            }
3299            for _ in 0..g.gen_range(0, 10) {
3300                rpc.control_msgs.push(test_control());
3301            }
3302            rpc
3303        }
3304    }
3305
3306    #[test]
3307    /// Tests RPC message fragmentation
3308    fn test_message_fragmentation_deterministic() {
3309        let max_transmit_size = 500;
3310        let config = crate::GossipsubConfigBuilder::default()
3311            .max_transmit_size(max_transmit_size)
3312            .validation_mode(ValidationMode::Permissive)
3313            .build()
3314            .unwrap();
3315        let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3316
3317        // Message under the limit should be fine.
3318        let mut rpc = empty_rpc();
3319        rpc.messages.push(test_message());
3320
3321        let mut rpc_proto = rpc.clone().into_protobuf();
3322        let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap();
3323        assert_eq!(
3324            fragmented_messages,
3325            vec![Arc::new(rpc_proto.clone())],
3326            "Messages under the limit shouldn't be fragmented"
3327        );
3328
3329        // Messages over the limit should be split
3330
3331        while rpc_proto.encoded_len() < max_transmit_size {
3332            rpc.messages.push(test_message());
3333            rpc_proto = rpc.clone().into_protobuf();
3334        }
3335
3336        let fragmented_messages = gs
3337            .fragment_message(Arc::new(rpc_proto))
3338            .expect("Should be able to fragment the messages");
3339
3340        assert!(
3341            fragmented_messages.len() > 1,
3342            "the message should be fragmented"
3343        );
3344
3345        // all fragmented messages should be under the limit
3346        for message in fragmented_messages {
3347            assert!(
3348                message.encoded_len() < max_transmit_size,
3349                "all messages should be less than the transmission size"
3350            );
3351        }
3352    }
3353
3354    #[test]
3355    fn test_message_fragmentation() {
3356        fn prop(rpc: GossipsubRpc) {
3357            let max_transmit_size = 500;
3358            let config = crate::GossipsubConfigBuilder::default()
3359                .max_transmit_size(max_transmit_size)
3360                .validation_mode(ValidationMode::Permissive)
3361                .build()
3362                .unwrap();
3363            let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
3364
3365            let mut length_codec = unsigned_varint::codec::UviBytes::default();
3366            length_codec.set_max_len(max_transmit_size);
3367            let mut codec =
3368                crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);
3369
3370            let rpc_proto = rpc.into_protobuf();
3371            let fragmented_messages = gs
3372                .fragment_message(Arc::new(rpc_proto.clone()))
3373                .expect("Messages must be valid");
3374
3375            if rpc_proto.encoded_len() < max_transmit_size {
3376                assert_eq!(
3377                    fragmented_messages.len(),
3378                    1,
3379                    "the message should not be fragmented"
3380                );
3381            } else {
3382                assert!(
3383                    fragmented_messages.len() > 1,
3384                    "the message should be fragmented"
3385                );
3386            }
3387
3388            // all fragmented messages should be under the limit
3389            for message in fragmented_messages {
3390                assert!(
3391                    message.encoded_len() < max_transmit_size,
3392                    "all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
3393                );
3394
3395                // ensure they can all be encoded
3396                let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
3397                codec
3398                    .encode(Arc::try_unwrap(message).unwrap(), &mut buf)
3399                    .unwrap()
3400            }
3401        }
3402        QuickCheck::new()
3403            .max_tests(100)
3404            .quickcheck(prop as fn(_) -> _)
3405    }
3406}