ant_libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use ant_libp2p_core as libp2p_core;
22use ant_libp2p_swarm as libp2p_swarm;
23
24use std::{
25    cmp::{max, Ordering, Ordering::Equal},
26    collections::{BTreeSet, HashMap, HashSet, VecDeque},
27    fmt,
28    fmt::Debug,
29    net::IpAddr,
30    task::{Context, Poll},
31    time::Duration,
32};
33
34use futures::FutureExt;
35use futures_timer::Delay;
36use libp2p_core::{
37    multiaddr::Protocol::{Ip4, Ip6},
38    transport::PortUse,
39    Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44    dial_opts::DialOpts,
45    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46    THandlerOutEvent, ToSwarm,
47};
48use prometheus_client::registry::Registry;
49use quick_protobuf::{MessageWrite, Writer};
50use rand::{seq::SliceRandom, thread_rng};
51use web_time::{Instant, SystemTime};
52
53use crate::{
54    backoff::BackoffStorage,
55    config::{Config, ValidationMode},
56    gossip_promises::GossipPromises,
57    handler::{Handler, HandlerEvent, HandlerIn},
58    mcache::MessageCache,
59    metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
60    peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
61    protocol::SIGNING_PREFIX,
62    rpc::Sender,
63    rpc_proto::proto,
64    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
65    time_cache::DuplicateCache,
66    topic::{Hasher, Topic, TopicHash},
67    transform::{DataTransform, IdentityTransform},
68    types::{
69        ControlAction, Graft, IHave, IWant, Message, MessageAcceptance, MessageId, PeerConnections,
70        PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction,
71    },
72    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
73};
74
75#[cfg(test)]
76mod tests;
77
78/// Determines if published messages should be signed or not.
79///
80/// Without signing, a number of privacy preserving modes can be selected.
81///
82/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
83/// should be updated in the [`Config`] to allow for unsigned messages.
84#[derive(Clone)]
85pub enum MessageAuthenticity {
86    /// Message signing is enabled. The author will be the owner of the key and the sequence number
87    /// will be linearly increasing.
88    Signed(Keypair),
89    /// Message signing is disabled.
90    ///
91    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
92    /// number will be randomized.
93    Author(PeerId),
94    /// Message signing is disabled.
95    ///
96    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
97    /// randomized.
98    RandomAuthor,
99    /// Message signing is disabled.
100    ///
101    /// The author of the message and the sequence numbers are excluded from the message.
102    ///
103    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
104    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
105    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
106    /// function will need to be set to prevent all messages from a peer being filtered
107    /// as duplicates.
108    Anonymous,
109}
110
111impl MessageAuthenticity {
112    /// Returns true if signing is enabled.
113    pub fn is_signing(&self) -> bool {
114        matches!(self, MessageAuthenticity::Signed(_))
115    }
116
117    pub fn is_anonymous(&self) -> bool {
118        matches!(self, MessageAuthenticity::Anonymous)
119    }
120}
121
122/// Event that can be emitted by the gossipsub behaviour.
123#[derive(Debug)]
124pub enum Event {
125    /// A message has been received.
126    Message {
127        /// The peer that forwarded us this message.
128        propagation_source: PeerId,
129        /// The [`MessageId`] of the message. This should be referenced by the application when
130        /// validating a message (if required).
131        message_id: MessageId,
132        /// The decompressed message itself.
133        message: Message,
134    },
135    /// A remote subscribed to a topic.
136    Subscribed {
137        /// Remote that has subscribed.
138        peer_id: PeerId,
139        /// The topic it has subscribed to.
140        topic: TopicHash,
141    },
142    /// A remote unsubscribed from a topic.
143    Unsubscribed {
144        /// Remote that has unsubscribed.
145        peer_id: PeerId,
146        /// The topic it has subscribed from.
147        topic: TopicHash,
148    },
149    /// A peer that does not support gossipsub has connected.
150    GossipsubNotSupported { peer_id: PeerId },
151    /// A peer is not able to download messages in time.
152    SlowPeer {
153        /// The peer_id
154        peer_id: PeerId,
155        /// The types and amounts of failed messages that are occurring for this peer.
156        failed_messages: FailedMessages,
157    },
158}
159
160/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
161/// for further details.
162#[allow(clippy::large_enum_variant)]
163enum PublishConfig {
164    Signing {
165        keypair: Keypair,
166        author: PeerId,
167        inline_key: Option<Vec<u8>>,
168        last_seq_no: SequenceNumber,
169    },
170    Author(PeerId),
171    RandomAuthor,
172    Anonymous,
173}
174
175/// A strictly linearly increasing sequence number.
176///
177/// We start from the current time as unix timestamp in milliseconds.
178#[derive(Debug)]
179struct SequenceNumber(u64);
180
181impl SequenceNumber {
182    fn new() -> Self {
183        let unix_timestamp = SystemTime::now()
184            .duration_since(SystemTime::UNIX_EPOCH)
185            .expect("time to be linear")
186            .as_nanos();
187
188        Self(unix_timestamp as u64)
189    }
190
191    fn next(&mut self) -> u64 {
192        self.0 = self
193            .0
194            .checked_add(1)
195            .expect("to not exhaust u64 space for sequence numbers");
196
197        self.0
198    }
199}
200
201impl PublishConfig {
202    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
203        match self {
204            Self::Signing { author, .. } => Some(author),
205            Self::Author(author) => Some(author),
206            _ => None,
207        }
208    }
209}
210
211impl From<MessageAuthenticity> for PublishConfig {
212    fn from(authenticity: MessageAuthenticity) -> Self {
213        match authenticity {
214            MessageAuthenticity::Signed(keypair) => {
215                let public_key = keypair.public();
216                let key_enc = public_key.encode_protobuf();
217                let key = if key_enc.len() <= 42 {
218                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
219                    // don't include it specifically in the
220                    // [`rpc_proto::proto::Message::key`] field.
221                    None
222                } else {
223                    // Include the protobuf encoding of the public key in the message.
224                    Some(key_enc)
225                };
226
227                PublishConfig::Signing {
228                    keypair,
229                    author: public_key.to_peer_id(),
230                    inline_key: key,
231                    last_seq_no: SequenceNumber::new(),
232                }
233            }
234            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
235            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
236            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
237        }
238    }
239}
240
241/// Network behaviour that handles the gossipsub protocol.
242///
243/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
244/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
245/// appropriate level to accept unsigned messages.
246///
247/// The DataTransform trait allows applications to optionally add extra encoding/decoding
248/// functionality to the underlying messages. This is intended for custom compression algorithms.
249///
250/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
251/// prevent unwanted messages being propagated and evaluated.
252pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
253    /// Configuration providing gossipsub performance parameters.
254    config: Config,
255
256    /// Events that need to be yielded to the outside when polling.
257    events: VecDeque<ToSwarm<Event, HandlerIn>>,
258
259    /// Information used for publishing messages.
260    publish_config: PublishConfig,
261
262    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
263    /// duplicates from being propagated to the application and on the network.
264    duplicate_cache: DuplicateCache<MessageId>,
265
266    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
267    /// the set of [`ConnectionId`]s.
268    connected_peers: HashMap<PeerId, PeerConnections>,
269
270    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
271    /// forward messages to, outside of the scoring system.
272    explicit_peers: HashSet<PeerId>,
273
274    /// A list of peers that have been blacklisted by the user.
275    /// Messages are not sent to and are rejected from these peers.
276    blacklisted_peers: HashSet<PeerId>,
277
278    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
279    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
280
281    /// Map of topics to list of peers that we publish to, but don't subscribe to.
282    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
283
284    /// The last publish time for fanout topics.
285    fanout_last_pub: HashMap<TopicHash, Instant>,
286
287    /// Storage for backoffs
288    backoffs: BackoffStorage,
289
290    /// Message cache for the last few heartbeats.
291    mcache: MessageCache,
292
293    /// Heartbeat interval stream.
294    heartbeat: Delay,
295
296    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
297    /// clean up -- eg backoff clean up.
298    heartbeat_ticks: u64,
299
300    /// We remember all peers we found through peer exchange, since those peers are not considered
301    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
302    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
303    /// be removed from this list which may result in a true outbound rediscovery.
304    px_peers: HashSet<PeerId>,
305
306    /// Set of connected outbound peers (we only consider true outbound peers found through
307    /// discovery and not by PX).
308    outbound_peers: HashSet<PeerId>,
309
310    /// Stores optional peer score data together with thresholds, decay interval and gossip
311    /// promises.
312    peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
313
314    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
315    count_received_ihave: HashMap<PeerId, usize>,
316
317    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
318    count_sent_iwant: HashMap<PeerId, usize>,
319
320    /// Short term cache for published message ids. This is used for penalizing peers sending
321    /// our own messages back if the messages are anonymous or use a random author.
322    published_message_ids: DuplicateCache<MessageId>,
323
324    /// The filter used to handle message subscriptions.
325    subscription_filter: F,
326
327    /// A general transformation function that can be applied to data received from the wire before
328    /// calculating the message-id and sending to the application. This is designed to allow the
329    /// user to implement arbitrary topic-based compression algorithms.
330    data_transform: D,
331
332    /// Keep track of a set of internal metrics relating to gossipsub.
333    metrics: Option<Metrics>,
334
335    /// Tracks the numbers of failed messages per peer-id.
336    failed_messages: HashMap<PeerId, FailedMessages>,
337}
338
339impl<D, F> Behaviour<D, F>
340where
341    D: DataTransform + Default,
342    F: TopicSubscriptionFilter + Default,
343{
344    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
345    /// [`Config`]. This has no subscription filter and uses no compression.
346    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
347        Self::new_with_subscription_filter_and_transform(
348            privacy,
349            config,
350            None,
351            F::default(),
352            D::default(),
353        )
354    }
355
356    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
357    /// [`Config`]. This has no subscription filter and uses no compression.
358    /// Metrics can be evaluated by passing a reference to a [`Registry`].
359    pub fn new_with_metrics(
360        privacy: MessageAuthenticity,
361        config: Config,
362        metrics_registry: &mut Registry,
363        metrics_config: MetricsConfig,
364    ) -> Result<Self, &'static str> {
365        Self::new_with_subscription_filter_and_transform(
366            privacy,
367            config,
368            Some((metrics_registry, metrics_config)),
369            F::default(),
370            D::default(),
371        )
372    }
373}
374
375impl<D, F> Behaviour<D, F>
376where
377    D: DataTransform + Default,
378    F: TopicSubscriptionFilter,
379{
380    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
381    /// [`Config`] and a custom subscription filter.
382    pub fn new_with_subscription_filter(
383        privacy: MessageAuthenticity,
384        config: Config,
385        metrics: Option<(&mut Registry, MetricsConfig)>,
386        subscription_filter: F,
387    ) -> Result<Self, &'static str> {
388        Self::new_with_subscription_filter_and_transform(
389            privacy,
390            config,
391            metrics,
392            subscription_filter,
393            D::default(),
394        )
395    }
396}
397
398impl<D, F> Behaviour<D, F>
399where
400    D: DataTransform,
401    F: TopicSubscriptionFilter + Default,
402{
403    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
404    /// [`Config`] and a custom data transform.
405    pub fn new_with_transform(
406        privacy: MessageAuthenticity,
407        config: Config,
408        metrics: Option<(&mut Registry, MetricsConfig)>,
409        data_transform: D,
410    ) -> Result<Self, &'static str> {
411        Self::new_with_subscription_filter_and_transform(
412            privacy,
413            config,
414            metrics,
415            F::default(),
416            data_transform,
417        )
418    }
419}
420
421impl<D, F> Behaviour<D, F>
422where
423    D: DataTransform,
424    F: TopicSubscriptionFilter,
425{
426    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
427    /// [`Config`] and a custom subscription filter and data transform.
428    pub fn new_with_subscription_filter_and_transform(
429        privacy: MessageAuthenticity,
430        config: Config,
431        metrics: Option<(&mut Registry, MetricsConfig)>,
432        subscription_filter: F,
433        data_transform: D,
434    ) -> Result<Self, &'static str> {
435        // Set up the router given the configuration settings.
436
437        // We do not allow configurations where a published message would also be rejected if it
438        // were received locally.
439        validate_config(&privacy, config.validation_mode())?;
440
441        Ok(Behaviour {
442            metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
443            events: VecDeque::new(),
444            publish_config: privacy.into(),
445            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
446            explicit_peers: HashSet::new(),
447            blacklisted_peers: HashSet::new(),
448            mesh: HashMap::new(),
449            fanout: HashMap::new(),
450            fanout_last_pub: HashMap::new(),
451            backoffs: BackoffStorage::new(
452                &config.prune_backoff(),
453                config.heartbeat_interval(),
454                config.backoff_slack(),
455            ),
456            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
457            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
458            heartbeat_ticks: 0,
459            px_peers: HashSet::new(),
460            outbound_peers: HashSet::new(),
461            peer_score: None,
462            count_received_ihave: HashMap::new(),
463            count_sent_iwant: HashMap::new(),
464            connected_peers: HashMap::new(),
465            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
466            config,
467            subscription_filter,
468            data_transform,
469            failed_messages: Default::default(),
470        })
471    }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476    D: DataTransform + Send + 'static,
477    F: TopicSubscriptionFilter + Send + 'static,
478{
479    /// Lists the hashes of the topics we are currently subscribed to.
480    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481        self.mesh.keys()
482    }
483
484    /// Lists all mesh peers for a certain topic hash.
485    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487    }
488
489    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490        let mut res = BTreeSet::new();
491        for peers in self.mesh.values() {
492            res.extend(peers);
493        }
494        res.into_iter()
495    }
496
497    /// Lists all known peers and their associated subscribed topics.
498    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499        self.connected_peers
500            .iter()
501            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502    }
503
504    /// Lists all known peers and their associated protocol.
505    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507    }
508
509    /// Returns the gossipsub score for a given peer, if one exists.
510    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511        self.peer_score
512            .as_ref()
513            .map(|(score, ..)| score.score(peer_id))
514    }
515
516    /// Subscribe to a topic.
517    ///
518    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
519    /// subscribed.
520    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
521        tracing::debug!(%topic, "Subscribing to topic");
522        let topic_hash = topic.hash();
523        if !self.subscription_filter.can_subscribe(&topic_hash) {
524            return Err(SubscriptionError::NotAllowed);
525        }
526
527        if self.mesh.contains_key(&topic_hash) {
528            tracing::debug!(%topic, "Topic is already in the mesh");
529            return Ok(false);
530        }
531
532        // send subscription request to all peers
533        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
534            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
535            let event = RpcOut::Subscribe(topic_hash.clone());
536            self.send_message(peer_id, event);
537        }
538
539        // call JOIN(topic)
540        // this will add new peers to the mesh for the topic
541        self.join(&topic_hash);
542        tracing::debug!(%topic, "Subscribed to topic");
543        Ok(true)
544    }
545
546    /// Unsubscribes from a topic.
547    ///
548    /// Returns `true` if we were subscribed to this topic.
549    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
550        tracing::debug!(%topic, "Unsubscribing from topic");
551        let topic_hash = topic.hash();
552
553        if !self.mesh.contains_key(&topic_hash) {
554            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
555            // we are not subscribed
556            return false;
557        }
558
559        // announce to all peers
560        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
561            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
562            let event = RpcOut::Unsubscribe(topic_hash.clone());
563            self.send_message(peer, event);
564        }
565
566        // call LEAVE(topic)
567        // this will remove the topic from the mesh
568        self.leave(&topic_hash);
569
570        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
571        true
572    }
573
574    /// Publishes a message with multiple topics to the network.
575    pub fn publish(
576        &mut self,
577        topic: impl Into<TopicHash>,
578        data: impl Into<Vec<u8>>,
579    ) -> Result<MessageId, PublishError> {
580        let data = data.into();
581        let topic = topic.into();
582
583        // Transform the data before building a raw_message.
584        let transformed_data = self
585            .data_transform
586            .outbound_transform(&topic, data.clone())?;
587
588        // check that the size doesn't exceed the max transmission size.
589        if transformed_data.len() > self.config.max_transmit_size() {
590            return Err(PublishError::MessageTooLarge);
591        }
592
593        let raw_message = self.build_raw_message(topic, transformed_data)?;
594
595        // calculate the message id from the un-transformed data
596        let msg_id = self.config.message_id(&Message {
597            source: raw_message.source,
598            data, // the uncompressed form
599            sequence_number: raw_message.sequence_number,
600            topic: raw_message.topic.clone(),
601        });
602
603        // Check the if the message has been published before
604        if self.duplicate_cache.contains(&msg_id) {
605            // This message has already been seen. We don't re-publish messages that have already
606            // been published on the network.
607            tracing::warn!(
608                message=%msg_id,
609                "Not publishing a message that has already been published"
610            );
611            return Err(PublishError::Duplicate);
612        }
613
614        tracing::trace!(message=%msg_id, "Publishing message");
615
616        let topic_hash = raw_message.topic.clone();
617
618        let mut peers_on_topic = self
619            .connected_peers
620            .iter()
621            .filter(|(_, p)| p.topics.contains(&topic_hash))
622            .map(|(peer_id, _)| peer_id)
623            .peekable();
624
625        if peers_on_topic.peek().is_none() {
626            return Err(PublishError::InsufficientPeers);
627        }
628
629        let mut recipient_peers = HashSet::new();
630        if self.config.flood_publish() {
631            // Forward to all peers above score and all explicit peers
632            recipient_peers.extend(peers_on_topic.filter(|p| {
633                self.explicit_peers.contains(*p)
634                    || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
635            }));
636        } else {
637            match self.mesh.get(&topic_hash) {
638                // Mesh peers
639                Some(mesh_peers) => {
640                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
641                    // peers (if possible).
642                    let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
643
644                    if needed_extra_peers > 0 {
645                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
646                        // and publish to them.
647
648                        // Get a random set of peers that are appropriate to send messages too.
649                        let peer_list = get_random_peers(
650                            &self.connected_peers,
651                            &topic_hash,
652                            needed_extra_peers,
653                            |peer| {
654                                !mesh_peers.contains(peer)
655                                    && !self.explicit_peers.contains(peer)
656                                    && !self
657                                        .score_below_threshold(peer, |pst| pst.publish_threshold)
658                                        .0
659                            },
660                        );
661                        recipient_peers.extend(peer_list);
662                    }
663
664                    recipient_peers.extend(mesh_peers);
665                }
666                // Gossipsub peers
667                None => {
668                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
669                    // If we have fanout peers add them to the map.
670                    if self.fanout.contains_key(&topic_hash) {
671                        for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
672                            recipient_peers.insert(*peer);
673                        }
674                    } else {
675                        // We have no fanout peers, select mesh_n of them and add them to the fanout
676                        let mesh_n = self.config.mesh_n();
677                        let new_peers =
678                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
679                                |p| {
680                                    !self.explicit_peers.contains(p)
681                                        && !self
682                                            .score_below_threshold(p, |pst| pst.publish_threshold)
683                                            .0
684                                }
685                            });
686                        // Add the new peers to the fanout and recipient peers
687                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
688                        for peer in new_peers {
689                            tracing::debug!(%peer, "Peer added to fanout");
690                            recipient_peers.insert(peer);
691                        }
692                    }
693                    // We are publishing to fanout peers - update the time we published
694                    self.fanout_last_pub
695                        .insert(topic_hash.clone(), Instant::now());
696                }
697            }
698
699            // Explicit peers that are part of the topic
700            recipient_peers
701                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
702
703            // Floodsub peers
704            for (peer, connections) in &self.connected_peers {
705                if connections.kind == PeerKind::Floodsub
706                    && !self
707                        .score_below_threshold(peer, |ts| ts.publish_threshold)
708                        .0
709                {
710                    recipient_peers.insert(*peer);
711                }
712            }
713        }
714
715        // If the message isn't a duplicate and we have sent it to some peers add it to the
716        // duplicate cache and memcache.
717        self.duplicate_cache.insert(msg_id.clone());
718        self.mcache.put(&msg_id, raw_message.clone());
719
720        // If the message is anonymous or has a random author add it to the published message ids
721        // cache.
722        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
723            if !self.config.allow_self_origin() {
724                self.published_message_ids.insert(msg_id.clone());
725            }
726        }
727
728        // Send to peers we know are subscribed to the topic.
729        let mut publish_failed = true;
730        for peer_id in recipient_peers.iter() {
731            tracing::trace!(peer=%peer_id, "Sending message to peer");
732            if self.send_message(
733                *peer_id,
734                RpcOut::Publish {
735                    message: raw_message.clone(),
736                    timeout: Delay::new(self.config.publish_queue_duration()),
737                },
738            ) {
739                publish_failed = false
740            }
741        }
742
743        if recipient_peers.is_empty() {
744            return Err(PublishError::InsufficientPeers);
745        }
746
747        if publish_failed {
748            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
749        }
750
751        tracing::debug!(message=%msg_id, "Published message");
752
753        if let Some(metrics) = self.metrics.as_mut() {
754            metrics.register_published_message(&topic_hash);
755        }
756
757        Ok(msg_id)
758    }
759
760    /// This function should be called when [`Config::validate_messages()`] is `true` after
761    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
762    /// validation is expected to be fast enough that the messages should still exist in the cache.
763    /// There are three possible validation outcomes and the outcome is given in acceptance.
764    ///
765    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
766    /// network. The `propagation_source` parameter indicates who the message was received by and
767    /// will not be forwarded back to that peer.
768    ///
769    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
770    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
771    //
772    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
773    /// but no Pâ‚„ penalty will be applied.
774    ///
775    /// This function will return true if the message was found in the cache and false if was not
776    /// in the cache anymore.
777    ///
778    /// This should only be called once per message.
779    pub fn report_message_validation_result(
780        &mut self,
781        msg_id: &MessageId,
782        propagation_source: &PeerId,
783        acceptance: MessageAcceptance,
784    ) -> bool {
785        let reject_reason = match acceptance {
786            MessageAcceptance::Accept => {
787                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
788                    Some((raw_message, originating_peers)) => {
789                        (raw_message.clone(), originating_peers)
790                    }
791                    None => {
792                        tracing::warn!(
793                            message=%msg_id,
794                            "Message not in cache. Ignoring forwarding"
795                        );
796                        if let Some(metrics) = self.metrics.as_mut() {
797                            metrics.memcache_miss();
798                        }
799                        return false;
800                    }
801                };
802
803                if let Some(metrics) = self.metrics.as_mut() {
804                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
805                }
806
807                self.forward_msg(
808                    msg_id,
809                    raw_message,
810                    Some(propagation_source),
811                    originating_peers,
812                );
813                return true;
814            }
815            MessageAcceptance::Reject => RejectReason::ValidationFailed,
816            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
817        };
818
819        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
820            if let Some(metrics) = self.metrics.as_mut() {
821                metrics.register_msg_validation(&raw_message.topic, &acceptance);
822            }
823
824            // Tell peer_score about reject
825            // Reject the original source, and any duplicates we've seen from other peers.
826            if let Some((peer_score, ..)) = &mut self.peer_score {
827                peer_score.reject_message(
828                    propagation_source,
829                    msg_id,
830                    &raw_message.topic,
831                    reject_reason,
832                );
833                for peer in originating_peers.iter() {
834                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
835                }
836            }
837            true
838        } else {
839            tracing::warn!(message=%msg_id, "Rejected message not in cache");
840            false
841        }
842    }
843
844    /// Adds a new peer to the list of explicitly connected peers.
845    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
846        tracing::debug!(peer=%peer_id, "Adding explicit peer");
847
848        self.explicit_peers.insert(*peer_id);
849
850        self.check_explicit_peer_connection(peer_id);
851    }
852
853    /// This removes the peer from explicitly connected peers, note that this does not disconnect
854    /// the peer.
855    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
856        tracing::debug!(peer=%peer_id, "Removing explicit peer");
857        self.explicit_peers.remove(peer_id);
858    }
859
860    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
861    /// created by this peer will be rejected.
862    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
863        if self.blacklisted_peers.insert(*peer_id) {
864            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
865        }
866    }
867
868    /// Removes a peer from the blacklist if it has previously been blacklisted.
869    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
870        if self.blacklisted_peers.remove(peer_id) {
871            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
872        }
873    }
874
875    /// Activates the peer scoring system with the given parameters. This will reset all scores
876    /// if there was already another peer scoring system activated. Returns an error if the
877    /// params are not valid or if they got already set.
878    pub fn with_peer_score(
879        &mut self,
880        params: PeerScoreParams,
881        threshold: PeerScoreThresholds,
882    ) -> Result<(), String> {
883        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
884    }
885
886    /// Activates the peer scoring system with the given parameters and a message delivery time
887    /// callback. Returns an error if the parameters got already set.
888    pub fn with_peer_score_and_message_delivery_time_callback(
889        &mut self,
890        params: PeerScoreParams,
891        threshold: PeerScoreThresholds,
892        callback: Option<fn(&PeerId, &TopicHash, f64)>,
893    ) -> Result<(), String> {
894        params.validate()?;
895        threshold.validate()?;
896
897        if self.peer_score.is_some() {
898            return Err("Peer score set twice".into());
899        }
900
901        let interval = Delay::new(params.decay_interval);
902        let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
903        self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
904        Ok(())
905    }
906
907    /// Sets scoring parameters for a topic.
908    ///
909    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
910    pub fn set_topic_params<H: Hasher>(
911        &mut self,
912        topic: Topic<H>,
913        params: TopicScoreParams,
914    ) -> Result<(), &'static str> {
915        if let Some((peer_score, ..)) = &mut self.peer_score {
916            peer_score.set_topic_params(topic.hash(), params);
917            Ok(())
918        } else {
919            Err("Peer score must be initialised with `with_peer_score()`")
920        }
921    }
922
923    /// Returns a scoring parameters for a topic if existent.
924    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
925        self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
926    }
927
928    /// Sets the application specific score for a peer. Returns true if scoring is active and
929    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
930    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
931        if let Some((peer_score, ..)) = &mut self.peer_score {
932            peer_score.set_application_score(peer_id, new_score)
933        } else {
934            false
935        }
936    }
937
938    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
939    fn join(&mut self, topic_hash: &TopicHash) {
940        tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
941
942        // if we are already in the mesh, return
943        if self.mesh.contains_key(topic_hash) {
944            tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
945            return;
946        }
947
948        let mut added_peers = HashSet::new();
949
950        if let Some(m) = self.metrics.as_mut() {
951            m.joined(topic_hash)
952        }
953
954        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
955        // removing the fanout entry.
956        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
957            tracing::debug!(
958                topic=%topic_hash,
959                "JOIN: Removing peers from the fanout for topic"
960            );
961
962            // remove explicit peers, peers with negative scores, and backoffed peers
963            peers.retain(|p| {
964                !self.explicit_peers.contains(p)
965                    && !self.score_below_threshold(p, |_| 0.0).0
966                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
967            });
968
969            // Add up to mesh_n of them to the mesh
970            // NOTE: These aren't randomly added, currently FIFO
971            let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
972            tracing::debug!(
973                topic=%topic_hash,
974                "JOIN: Adding {:?} peers from the fanout for topic",
975                add_peers
976            );
977            added_peers.extend(peers.iter().take(add_peers));
978
979            self.mesh.insert(
980                topic_hash.clone(),
981                peers.into_iter().take(add_peers).collect(),
982            );
983
984            // remove the last published time
985            self.fanout_last_pub.remove(topic_hash);
986        }
987
988        let fanaout_added = added_peers.len();
989        if let Some(m) = self.metrics.as_mut() {
990            m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
991        }
992
993        // check if we need to get more peers, which we randomly select
994        if added_peers.len() < self.config.mesh_n() {
995            // get the peers
996            let new_peers = get_random_peers(
997                &self.connected_peers,
998                topic_hash,
999                self.config.mesh_n() - added_peers.len(),
1000                |peer| {
1001                    !added_peers.contains(peer)
1002                        && !self.explicit_peers.contains(peer)
1003                        && !self.score_below_threshold(peer, |_| 0.0).0
1004                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1005                },
1006            );
1007            added_peers.extend(new_peers.clone());
1008            // add them to the mesh
1009            tracing::debug!(
1010                "JOIN: Inserting {:?} random peers into the mesh",
1011                new_peers.len()
1012            );
1013            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1014            mesh_peers.extend(new_peers);
1015        }
1016
1017        let random_added = added_peers.len() - fanaout_added;
1018        if let Some(m) = self.metrics.as_mut() {
1019            m.peers_included(topic_hash, Inclusion::Random, random_added)
1020        }
1021
1022        for peer_id in added_peers {
1023            // Send a GRAFT control message
1024            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1025            if let Some((peer_score, ..)) = &mut self.peer_score {
1026                peer_score.graft(&peer_id, topic_hash.clone());
1027            }
1028            self.send_message(
1029                peer_id,
1030                RpcOut::Graft(Graft {
1031                    topic_hash: topic_hash.clone(),
1032                }),
1033            );
1034
1035            // If the peer did not previously exist in any mesh, inform the handler
1036            peer_added_to_mesh(
1037                peer_id,
1038                vec![topic_hash],
1039                &self.mesh,
1040                &mut self.events,
1041                &self.connected_peers,
1042            );
1043        }
1044
1045        let mesh_peers = self.mesh_peers(topic_hash).count();
1046        if let Some(m) = self.metrics.as_mut() {
1047            m.set_mesh_peers(topic_hash, mesh_peers)
1048        }
1049
1050        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1051    }
1052
1053    /// Creates a PRUNE gossipsub action.
1054    fn make_prune(
1055        &mut self,
1056        topic_hash: &TopicHash,
1057        peer: &PeerId,
1058        do_px: bool,
1059        on_unsubscribe: bool,
1060    ) -> Prune {
1061        if let Some((peer_score, ..)) = &mut self.peer_score {
1062            peer_score.prune(peer, topic_hash.clone());
1063        }
1064
1065        match self.connected_peers.get(peer).map(|v| &v.kind) {
1066            Some(PeerKind::Floodsub) => {
1067                tracing::error!("Attempted to prune a Floodsub peer");
1068            }
1069            Some(PeerKind::Gossipsub) => {
1070                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1071                return Prune {
1072                    topic_hash: topic_hash.clone(),
1073                    peers: Vec::new(),
1074                    backoff: None,
1075                };
1076            }
1077            None => {
1078                tracing::error!("Attempted to Prune an unknown peer");
1079            }
1080            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1081        }
1082
1083        // Select peers for peer exchange
1084        let peers = if do_px {
1085            get_random_peers(
1086                &self.connected_peers,
1087                topic_hash,
1088                self.config.prune_peers(),
1089                |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1090            )
1091            .into_iter()
1092            .map(|p| PeerInfo { peer_id: Some(p) })
1093            .collect()
1094        } else {
1095            Vec::new()
1096        };
1097
1098        let backoff = if on_unsubscribe {
1099            self.config.unsubscribe_backoff()
1100        } else {
1101            self.config.prune_backoff()
1102        };
1103
1104        // update backoff
1105        self.backoffs.update_backoff(topic_hash, peer, backoff);
1106
1107        Prune {
1108            topic_hash: topic_hash.clone(),
1109            peers,
1110            backoff: Some(backoff.as_secs()),
1111        }
1112    }
1113
1114    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1115    fn leave(&mut self, topic_hash: &TopicHash) {
1116        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1117
1118        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1119        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1120            if let Some(m) = self.metrics.as_mut() {
1121                m.left(topic_hash)
1122            }
1123            for peer_id in peers {
1124                // Send a PRUNE control message
1125                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1126
1127                let on_unsubscribe = true;
1128                let prune =
1129                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1130                self.send_message(peer_id, RpcOut::Prune(prune));
1131
1132                // If the peer did not previously exist in any mesh, inform the handler
1133                peer_removed_from_mesh(
1134                    peer_id,
1135                    topic_hash,
1136                    &self.mesh,
1137                    &mut self.events,
1138                    &self.connected_peers,
1139                );
1140            }
1141        }
1142        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1143    }
1144
1145    /// Checks if the given peer is still connected and if not dials the peer again.
1146    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1147        if !self.connected_peers.contains_key(peer_id) {
1148            // Connect to peer
1149            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1150            self.events.push_back(ToSwarm::Dial {
1151                opts: DialOpts::peer_id(*peer_id).build(),
1152            });
1153        }
1154    }
1155
1156    /// Determines if a peer's score is below a given `PeerScoreThreshold` chosen via the
1157    /// `threshold` parameter.
1158    fn score_below_threshold(
1159        &self,
1160        peer_id: &PeerId,
1161        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1162    ) -> (bool, f64) {
1163        Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1164    }
1165
1166    fn score_below_threshold_from_scores(
1167        peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
1168        peer_id: &PeerId,
1169        threshold: impl Fn(&PeerScoreThresholds) -> f64,
1170    ) -> (bool, f64) {
1171        if let Some((peer_score, thresholds, ..)) = peer_score {
1172            let score = peer_score.score(peer_id);
1173            if score < threshold(thresholds) {
1174                return (true, score);
1175            }
1176            (false, score)
1177        } else {
1178            (false, 0.0)
1179        }
1180    }
1181
1182    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1183    /// requests it with an IWANT control message.
1184    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1185        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1186        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1187            tracing::debug!(
1188                peer=%peer_id,
1189                %score,
1190                "IHAVE: ignoring peer with score below threshold"
1191            );
1192            return;
1193        }
1194
1195        // IHAVE flood protection
1196        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1197        *peer_have += 1;
1198        if *peer_have > self.config.max_ihave_messages() {
1199            tracing::debug!(
1200                peer=%peer_id,
1201                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1202            interval; ignoring",
1203                *peer_have
1204            );
1205            return;
1206        }
1207
1208        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1209            if *iasked >= self.config.max_ihave_length() {
1210                tracing::debug!(
1211                    peer=%peer_id,
1212                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1213                    *iasked
1214                );
1215                return;
1216            }
1217        }
1218
1219        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1220
1221        let mut iwant_ids = HashSet::new();
1222
1223        let want_message = |id: &MessageId| {
1224            if self.duplicate_cache.contains(id) {
1225                return false;
1226            }
1227
1228            self.peer_score
1229                .as_ref()
1230                .map(|(_, _, _, promises)| !promises.contains(id))
1231                .unwrap_or(true)
1232        };
1233
1234        for (topic, ids) in ihave_msgs {
1235            // only process the message if we are subscribed
1236            if !self.mesh.contains_key(&topic) {
1237                tracing::debug!(
1238                    %topic,
1239                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1240                );
1241                continue;
1242            }
1243
1244            for id in ids.into_iter().filter(want_message) {
1245                // have not seen this message and are not currently requesting it
1246                if iwant_ids.insert(id) {
1247                    // Register the IWANT metric
1248                    if let Some(metrics) = self.metrics.as_mut() {
1249                        metrics.register_iwant(&topic);
1250                    }
1251                }
1252            }
1253        }
1254
1255        if !iwant_ids.is_empty() {
1256            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1257            let mut iask = iwant_ids.len();
1258            if *iasked + iask > self.config.max_ihave_length() {
1259                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1260            }
1261
1262            // Send the list of IWANT control messages
1263            tracing::debug!(
1264                peer=%peer_id,
1265                "IHAVE: Asking for {} out of {} messages from peer",
1266                iask,
1267                iwant_ids.len()
1268            );
1269
1270            // Ask in random order
1271            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1272            let mut rng = thread_rng();
1273            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1274
1275            iwant_ids_vec.truncate(iask);
1276            *iasked += iask;
1277
1278            if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1279                gossip_promises.add_promise(
1280                    *peer_id,
1281                    &iwant_ids_vec,
1282                    Instant::now() + self.config.iwant_followup_time(),
1283                );
1284            }
1285            tracing::trace!(
1286                peer=%peer_id,
1287                "IHAVE: Asking for the following messages from peer: {:?}",
1288                iwant_ids_vec
1289            );
1290
1291            self.send_message(
1292                *peer_id,
1293                RpcOut::IWant(IWant {
1294                    message_ids: iwant_ids_vec,
1295                }),
1296            );
1297        }
1298        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1299    }
1300
1301    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1302    /// forwarded to the requesting peer.
1303    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1304        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1305        if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1306            tracing::debug!(
1307                peer=%peer_id,
1308                "IWANT: ignoring peer with score below threshold [score = {}]",
1309                score
1310            );
1311            return;
1312        }
1313
1314        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1315
1316        for id in iwant_msgs {
1317            // If we have it and the IHAVE count is not above the threshold,
1318            // forward the message.
1319            if let Some((msg, count)) = self
1320                .mcache
1321                .get_with_iwant_counts(&id, peer_id)
1322                .map(|(msg, count)| (msg.clone(), count))
1323            {
1324                if count > self.config.gossip_retransimission() {
1325                    tracing::debug!(
1326                        peer=%peer_id,
1327                        message=%id,
1328                        "IWANT: Peer has asked for message too many times; ignoring request"
1329                    );
1330                } else {
1331                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1332                    self.send_message(
1333                        *peer_id,
1334                        RpcOut::Forward {
1335                            message: msg,
1336                            timeout: Delay::new(self.config.forward_queue_duration()),
1337                        },
1338                    );
1339                }
1340            }
1341        }
1342        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1343    }
1344
1345    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1346    /// responds with PRUNE messages.
1347    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1348        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1349
1350        let mut to_prune_topics = HashSet::new();
1351
1352        let mut do_px = self.config.do_px();
1353
1354        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1355            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1356            return;
1357        };
1358
1359        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1360        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1361        for topic in &topics {
1362            if connected_peer.topics.insert(topic.clone()) {
1363                if let Some(m) = self.metrics.as_mut() {
1364                    m.inc_topic_peers(topic);
1365                }
1366            }
1367        }
1368
1369        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1370        if self.explicit_peers.contains(peer_id) {
1371            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1372            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1373            to_prune_topics = topics.into_iter().collect();
1374            // but don't PX
1375            do_px = false
1376        } else {
1377            let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1378            let now = Instant::now();
1379            for topic_hash in topics {
1380                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1381                    // if the peer is already in the mesh ignore the graft
1382                    if peers.contains(peer_id) {
1383                        tracing::debug!(
1384                            peer=%peer_id,
1385                            topic=%&topic_hash,
1386                            "GRAFT: Received graft for peer that is already in topic"
1387                        );
1388                        continue;
1389                    }
1390
1391                    // make sure we are not backing off that peer
1392                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1393                    {
1394                        if backoff_time > now {
1395                            tracing::warn!(
1396                                peer=%peer_id,
1397                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1398                            );
1399                            // add behavioural penalty
1400                            if let Some((peer_score, ..)) = &mut self.peer_score {
1401                                if let Some(metrics) = self.metrics.as_mut() {
1402                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1403                                }
1404                                peer_score.add_penalty(peer_id, 1);
1405
1406                                // check the flood cutoff
1407                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1408                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1409                                let flood_cutoff = (backoff_time
1410                                    + self.config.graft_flood_threshold())
1411                                    - self.config.prune_backoff();
1412                                if flood_cutoff > now {
1413                                    // extra penalty
1414                                    peer_score.add_penalty(peer_id, 1);
1415                                }
1416                            }
1417                            // no PX
1418                            do_px = false;
1419
1420                            to_prune_topics.insert(topic_hash.clone());
1421                            continue;
1422                        }
1423                    }
1424
1425                    // check the score
1426                    if below_zero {
1427                        // we don't GRAFT peers with negative score
1428                        tracing::debug!(
1429                            peer=%peer_id,
1430                            %score,
1431                            topic=%topic_hash,
1432                            "GRAFT: ignoring peer with negative score"
1433                        );
1434                        // we do send them PRUNE however, because it's a matter of protocol
1435                        // correctness
1436                        to_prune_topics.insert(topic_hash.clone());
1437                        // but we won't PX to them
1438                        do_px = false;
1439                        continue;
1440                    }
1441
1442                    // check mesh upper bound and only allow graft if the upper bound is not reached
1443                    // or if it is an outbound peer
1444                    if peers.len() >= self.config.mesh_n_high()
1445                        && !self.outbound_peers.contains(peer_id)
1446                    {
1447                        to_prune_topics.insert(topic_hash.clone());
1448                        continue;
1449                    }
1450
1451                    // add peer to the mesh
1452                    tracing::debug!(
1453                        peer=%peer_id,
1454                        topic=%topic_hash,
1455                        "GRAFT: Mesh link added for peer in topic"
1456                    );
1457
1458                    if peers.insert(*peer_id) {
1459                        if let Some(m) = self.metrics.as_mut() {
1460                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1461                        }
1462                    }
1463
1464                    // If the peer did not previously exist in any mesh, inform the handler
1465                    peer_added_to_mesh(
1466                        *peer_id,
1467                        vec![&topic_hash],
1468                        &self.mesh,
1469                        &mut self.events,
1470                        &self.connected_peers,
1471                    );
1472
1473                    if let Some((peer_score, ..)) = &mut self.peer_score {
1474                        peer_score.graft(peer_id, topic_hash);
1475                    }
1476                } else {
1477                    // don't do PX when there is an unknown topic to avoid leaking our peers
1478                    do_px = false;
1479                    tracing::debug!(
1480                        peer=%peer_id,
1481                        topic=%topic_hash,
1482                        "GRAFT: Received graft for unknown topic from peer"
1483                    );
1484                    // spam hardening: ignore GRAFTs for unknown topics
1485                    continue;
1486                }
1487            }
1488        }
1489
1490        if !to_prune_topics.is_empty() {
1491            // build the prune messages to send
1492            let on_unsubscribe = false;
1493
1494            for prune in to_prune_topics
1495                .iter()
1496                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1497                .collect::<Vec<_>>()
1498            {
1499                self.send_message(*peer_id, RpcOut::Prune(prune));
1500            }
1501            // Send the prune messages to the peer
1502            tracing::debug!(
1503                peer=%peer_id,
1504                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1505            );
1506        }
1507        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1508    }
1509
1510    fn remove_peer_from_mesh(
1511        &mut self,
1512        peer_id: &PeerId,
1513        topic_hash: &TopicHash,
1514        backoff: Option<u64>,
1515        always_update_backoff: bool,
1516        reason: Churn,
1517    ) {
1518        let mut update_backoff = always_update_backoff;
1519        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1520            // remove the peer if it exists in the mesh
1521            if peers.remove(peer_id) {
1522                tracing::debug!(
1523                    peer=%peer_id,
1524                    topic=%topic_hash,
1525                    "PRUNE: Removing peer from the mesh for topic"
1526                );
1527                if let Some(m) = self.metrics.as_mut() {
1528                    m.peers_removed(topic_hash, reason, 1)
1529                }
1530
1531                if let Some((peer_score, ..)) = &mut self.peer_score {
1532                    peer_score.prune(peer_id, topic_hash.clone());
1533                }
1534
1535                update_backoff = true;
1536
1537                // inform the handler
1538                peer_removed_from_mesh(
1539                    *peer_id,
1540                    topic_hash,
1541                    &self.mesh,
1542                    &mut self.events,
1543                    &self.connected_peers,
1544                );
1545            }
1546        }
1547        if update_backoff {
1548            let time = if let Some(backoff) = backoff {
1549                Duration::from_secs(backoff)
1550            } else {
1551                self.config.prune_backoff()
1552            };
1553            // is there a backoff specified by the peer? if so obey it.
1554            self.backoffs.update_backoff(topic_hash, peer_id, time);
1555        }
1556    }
1557
1558    /// Handles PRUNE control messages. Removes peer from the mesh.
1559    fn handle_prune(
1560        &mut self,
1561        peer_id: &PeerId,
1562        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1563    ) {
1564        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1565        let (below_threshold, score) =
1566            self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1567        for (topic_hash, px, backoff) in prune_data {
1568            self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1569
1570            if self.mesh.contains_key(&topic_hash) {
1571                // connect to px peers
1572                if !px.is_empty() {
1573                    // we ignore PX from peers with insufficient score
1574                    if below_threshold {
1575                        tracing::debug!(
1576                            peer=%peer_id,
1577                            %score,
1578                            topic=%topic_hash,
1579                            "PRUNE: ignoring PX from peer with insufficient score"
1580                        );
1581                        continue;
1582                    }
1583
1584                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1585                    // know their multiaddr. Until SignedRecords are spec'd this
1586                    // remains a stub. By default `config.prune_peers()` is set to zero and
1587                    // this is skipped. If the user modifies this, this will only be able to
1588                    // dial already known peers (from an external discovery mechanism for
1589                    // example).
1590                    if self.config.prune_peers() > 0 {
1591                        self.px_connect(px);
1592                    }
1593                }
1594            }
1595        }
1596        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1597    }
1598
1599    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1600        let n = self.config.prune_peers();
1601        // Ignore peerInfo with no ID
1602        //
1603        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1604        // signed peer record?
1605        px.retain(|p| p.peer_id.is_some());
1606        if px.len() > n {
1607            // only use at most prune_peers many random peers
1608            let mut rng = thread_rng();
1609            px.partial_shuffle(&mut rng, n);
1610            px = px.into_iter().take(n).collect();
1611        }
1612
1613        for p in px {
1614            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1615            // it, see https://github.com/libp2p/specs/pull/217
1616            if let Some(peer_id) = p.peer_id {
1617                // mark as px peer
1618                self.px_peers.insert(peer_id);
1619
1620                // dial peer
1621                self.events.push_back(ToSwarm::Dial {
1622                    opts: DialOpts::peer_id(peer_id).build(),
1623                });
1624            }
1625        }
1626    }
1627
1628    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1629    /// checks.
1630    fn message_is_valid(
1631        &mut self,
1632        msg_id: &MessageId,
1633        raw_message: &mut RawMessage,
1634        propagation_source: &PeerId,
1635    ) -> bool {
1636        tracing::debug!(
1637            peer=%propagation_source,
1638            message=%msg_id,
1639            "Handling message from peer"
1640        );
1641
1642        // Reject any message from a blacklisted peer
1643        if self.blacklisted_peers.contains(propagation_source) {
1644            tracing::debug!(
1645                peer=%propagation_source,
1646                "Rejecting message from blacklisted peer"
1647            );
1648            if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1649                peer_score.reject_message(
1650                    propagation_source,
1651                    msg_id,
1652                    &raw_message.topic,
1653                    RejectReason::BlackListedPeer,
1654                );
1655                gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1656            }
1657            return false;
1658        }
1659
1660        // Also reject any message that originated from a blacklisted peer
1661        if let Some(source) = raw_message.source.as_ref() {
1662            if self.blacklisted_peers.contains(source) {
1663                tracing::debug!(
1664                    peer=%propagation_source,
1665                    %source,
1666                    "Rejecting message from peer because of blacklisted source"
1667                );
1668                self.handle_invalid_message(
1669                    propagation_source,
1670                    raw_message,
1671                    RejectReason::BlackListedSource,
1672                );
1673                return false;
1674            }
1675        }
1676
1677        // If we are not validating messages, assume this message is validated
1678        // This will allow the message to be gossiped without explicitly calling
1679        // `validate_message`.
1680        if !self.config.validate_messages() {
1681            raw_message.validated = true;
1682        }
1683
1684        // reject messages claiming to be from ourselves but not locally published
1685        let self_published = !self.config.allow_self_origin()
1686            && if let Some(own_id) = self.publish_config.get_own_id() {
1687                own_id != propagation_source
1688                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1689            } else {
1690                self.published_message_ids.contains(msg_id)
1691            };
1692
1693        if self_published {
1694            tracing::debug!(
1695                message=%msg_id,
1696                source=%propagation_source,
1697                "Dropping message claiming to be from self but forwarded from source"
1698            );
1699            self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin);
1700            return false;
1701        }
1702
1703        true
1704    }
1705
1706    /// Handles a newly received [`RawMessage`].
1707    ///
1708    /// Forwards the message to all peers in the mesh.
1709    fn handle_received_message(
1710        &mut self,
1711        mut raw_message: RawMessage,
1712        propagation_source: &PeerId,
1713    ) {
1714        // Record the received metric
1715        if let Some(metrics) = self.metrics.as_mut() {
1716            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1717        }
1718
1719        // Try and perform the data transform to the message. If it fails, consider it invalid.
1720        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1721            Ok(message) => message,
1722            Err(e) => {
1723                tracing::debug!("Invalid message. Transform error: {:?}", e);
1724                // Reject the message and return
1725                self.handle_invalid_message(
1726                    propagation_source,
1727                    &raw_message,
1728                    RejectReason::ValidationError(ValidationError::TransformFailed),
1729                );
1730                return;
1731            }
1732        };
1733
1734        // Calculate the message id on the transformed data.
1735        let msg_id = self.config.message_id(&message);
1736
1737        // Check the validity of the message
1738        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1739        // and instead continually penalize peers that repeatedly send this message.
1740        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1741            return;
1742        }
1743
1744        if !self.duplicate_cache.insert(msg_id.clone()) {
1745            tracing::debug!(message=%msg_id, "Message already received, ignoring");
1746            if let Some((peer_score, ..)) = &mut self.peer_score {
1747                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1748            }
1749            self.mcache.observe_duplicate(&msg_id, propagation_source);
1750            return;
1751        }
1752        tracing::debug!(
1753            message=%msg_id,
1754            "Put message in duplicate_cache and resolve promises"
1755        );
1756
1757        // Record the received message with the metrics
1758        if let Some(metrics) = self.metrics.as_mut() {
1759            metrics.msg_recvd(&message.topic);
1760        }
1761
1762        // Tells score that message arrived (but is maybe not fully validated yet).
1763        // Consider the message as delivered for gossip promises.
1764        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1765            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1766            gossip_promises.message_delivered(&msg_id);
1767        }
1768
1769        // Add the message to our memcache
1770        self.mcache.put(&msg_id, raw_message.clone());
1771
1772        // Dispatch the message to the user if we are subscribed to any of the topics
1773        if self.mesh.contains_key(&message.topic) {
1774            tracing::debug!("Sending received message to user");
1775            self.events
1776                .push_back(ToSwarm::GenerateEvent(Event::Message {
1777                    propagation_source: *propagation_source,
1778                    message_id: msg_id.clone(),
1779                    message,
1780                }));
1781        } else {
1782            tracing::debug!(
1783                topic=%message.topic,
1784                "Received message on a topic we are not subscribed to"
1785            );
1786            return;
1787        }
1788
1789        // forward the message to mesh peers, if no validation is required
1790        if !self.config.validate_messages() {
1791            self.forward_msg(
1792                &msg_id,
1793                raw_message,
1794                Some(propagation_source),
1795                HashSet::new(),
1796            );
1797            tracing::debug!(message=%msg_id, "Completed message handling for message");
1798        }
1799    }
1800
1801    // Handles invalid messages received.
1802    fn handle_invalid_message(
1803        &mut self,
1804        propagation_source: &PeerId,
1805        raw_message: &RawMessage,
1806        reject_reason: RejectReason,
1807    ) {
1808        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1809            if let Some(metrics) = self.metrics.as_mut() {
1810                metrics.register_invalid_message(&raw_message.topic);
1811            }
1812
1813            if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) {
1814                let message_id = self.config.message_id(&message);
1815
1816                peer_score.reject_message(
1817                    propagation_source,
1818                    &message_id,
1819                    &message.topic,
1820                    reject_reason,
1821                );
1822
1823                gossip_promises.reject_message(&message_id, &reject_reason);
1824            } else {
1825                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1826                // advertising this message via an IHAVE and it's invalid it will be double
1827                // penalized, one for sending us an invalid and again for breaking a promise.
1828                peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1829            }
1830        }
1831    }
1832
1833    /// Handles received subscriptions.
1834    fn handle_received_subscriptions(
1835        &mut self,
1836        subscriptions: &[Subscription],
1837        propagation_source: &PeerId,
1838    ) {
1839        tracing::debug!(
1840            source=%propagation_source,
1841            "Handling subscriptions: {:?}",
1842            subscriptions,
1843        );
1844
1845        let mut unsubscribed_peers = Vec::new();
1846
1847        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1848            tracing::error!(
1849                peer=%propagation_source,
1850                "Subscription by unknown peer"
1851            );
1852            return;
1853        };
1854
1855        // Collect potential graft topics for the peer.
1856        let mut topics_to_graft = Vec::new();
1857
1858        // Notify the application about the subscription, after the grafts are sent.
1859        let mut application_event = Vec::new();
1860
1861        let filtered_topics = match self
1862            .subscription_filter
1863            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1864        {
1865            Ok(topics) => topics,
1866            Err(s) => {
1867                tracing::error!(
1868                    peer=%propagation_source,
1869                    "Subscription filter error: {}; ignoring RPC from peer",
1870                    s
1871                );
1872                return;
1873            }
1874        };
1875
1876        for subscription in filtered_topics {
1877            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1878            let topic_hash = &subscription.topic_hash;
1879
1880            match subscription.action {
1881                SubscriptionAction::Subscribe => {
1882                    if peer.topics.insert(topic_hash.clone()) {
1883                        tracing::debug!(
1884                            peer=%propagation_source,
1885                            topic=%topic_hash,
1886                            "SUBSCRIPTION: Adding gossip peer to topic"
1887                        );
1888
1889                        if let Some(m) = self.metrics.as_mut() {
1890                            m.inc_topic_peers(topic_hash);
1891                        }
1892                    }
1893
1894                    // if the mesh needs peers add the peer to the mesh
1895                    if !self.explicit_peers.contains(propagation_source)
1896                        && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub)
1897                        && !Self::score_below_threshold_from_scores(
1898                            &self.peer_score,
1899                            propagation_source,
1900                            |_| 0.0,
1901                        )
1902                        .0
1903                        && !self
1904                            .backoffs
1905                            .is_backoff_with_slack(topic_hash, propagation_source)
1906                    {
1907                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1908                            if peers.len() < self.config.mesh_n_low()
1909                                && peers.insert(*propagation_source)
1910                            {
1911                                tracing::debug!(
1912                                    peer=%propagation_source,
1913                                    topic=%topic_hash,
1914                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1915                                );
1916                                if let Some(m) = self.metrics.as_mut() {
1917                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1918                                }
1919                                // send graft to the peer
1920                                tracing::debug!(
1921                                    peer=%propagation_source,
1922                                    topic=%topic_hash,
1923                                    "Sending GRAFT to peer for topic"
1924                                );
1925                                if let Some((peer_score, ..)) = &mut self.peer_score {
1926                                    peer_score.graft(propagation_source, topic_hash.clone());
1927                                }
1928                                topics_to_graft.push(topic_hash.clone());
1929                            }
1930                        }
1931                    }
1932                    // generates a subscription event to be polled
1933                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1934                        peer_id: *propagation_source,
1935                        topic: topic_hash.clone(),
1936                    }));
1937                }
1938                SubscriptionAction::Unsubscribe => {
1939                    if peer.topics.remove(topic_hash) {
1940                        tracing::debug!(
1941                            peer=%propagation_source,
1942                            topic=%topic_hash,
1943                            "SUBSCRIPTION: Removing gossip peer from topic"
1944                        );
1945
1946                        if let Some(m) = self.metrics.as_mut() {
1947                            m.dec_topic_peers(topic_hash);
1948                        }
1949                    }
1950
1951                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1952                    // generate an unsubscribe event to be polled
1953                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1954                        peer_id: *propagation_source,
1955                        topic: topic_hash.clone(),
1956                    }));
1957                }
1958            }
1959        }
1960
1961        // remove unsubscribed peers from the mesh and fanout if they exist there.
1962        for (peer_id, topic_hash) in unsubscribed_peers {
1963            self.fanout
1964                .get_mut(&topic_hash)
1965                .map(|peers| peers.remove(&peer_id));
1966            self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
1967        }
1968
1969        // Potentially inform the handler if we have added this peer to a mesh for the first time.
1970        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
1971        if !topics_joined.is_empty() {
1972            peer_added_to_mesh(
1973                *propagation_source,
1974                topics_joined,
1975                &self.mesh,
1976                &mut self.events,
1977                &self.connected_peers,
1978            );
1979        }
1980
1981        // If we need to send grafts to peer, do so immediately, rather than waiting for the
1982        // heartbeat.
1983        for topic_hash in topics_to_graft.into_iter() {
1984            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
1985        }
1986
1987        // Notify the application of the subscriptions
1988        for event in application_event {
1989            self.events.push_back(event);
1990        }
1991
1992        tracing::trace!(
1993            source=%propagation_source,
1994            "Completed handling subscriptions from source"
1995        );
1996    }
1997
1998    /// Applies penalties to peers that did not respond to our IWANT requests.
1999    fn apply_iwant_penalties(&mut self) {
2000        if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
2001            for (peer, count) in gossip_promises.get_broken_promises() {
2002                peer_score.add_penalty(&peer, count);
2003                if let Some(metrics) = self.metrics.as_mut() {
2004                    metrics.register_score_penalty(Penalty::BrokenPromise);
2005                }
2006            }
2007        }
2008    }
2009
2010    /// Heartbeat function which shifts the memcache and updates the mesh.
2011    fn heartbeat(&mut self) {
2012        tracing::debug!("Starting heartbeat");
2013        let start = Instant::now();
2014
2015        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2016        // before we add all the gossip from this heartbeat in order to gain a true measure of
2017        // steady-state size of the queues.
2018        if let Some(m) = &mut self.metrics {
2019            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2020                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2021                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2022            }
2023        }
2024
2025        self.heartbeat_ticks += 1;
2026
2027        let mut to_graft = HashMap::new();
2028        let mut to_prune = HashMap::new();
2029        let mut no_px = HashSet::new();
2030
2031        // clean up expired backoffs
2032        self.backoffs.heartbeat();
2033
2034        // clean up ihave counters
2035        self.count_sent_iwant.clear();
2036        self.count_received_ihave.clear();
2037
2038        // apply iwant penalties
2039        self.apply_iwant_penalties();
2040
2041        // check connections to explicit peers
2042        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2043            for p in self.explicit_peers.clone() {
2044                self.check_explicit_peer_connection(&p);
2045            }
2046        }
2047
2048        // Cache the scores of all connected peers, and record metrics for current penalties.
2049        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2050        if let Some((peer_score, ..)) = &self.peer_score {
2051            for peer_id in self.connected_peers.keys() {
2052                scores
2053                    .entry(peer_id)
2054                    .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2055            }
2056        }
2057
2058        // maintain the mesh for each topic
2059        for (topic_hash, peers) in self.mesh.iter_mut() {
2060            let explicit_peers = &self.explicit_peers;
2061            let backoffs = &self.backoffs;
2062            let outbound_peers = &self.outbound_peers;
2063
2064            // drop all peers with negative score, without PX
2065            // if there is at some point a stable retain method for BTreeSet the following can be
2066            // written more efficiently with retain.
2067            let mut to_remove_peers = Vec::new();
2068            for peer_id in peers.iter() {
2069                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2070
2071                // Record the score per mesh
2072                if let Some(metrics) = self.metrics.as_mut() {
2073                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2074                }
2075
2076                if peer_score < 0.0 {
2077                    tracing::debug!(
2078                        peer=%peer_id,
2079                        score=%peer_score,
2080                        topic=%topic_hash,
2081                        "HEARTBEAT: Prune peer with negative score"
2082                    );
2083
2084                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2085                    current_topic.push(topic_hash.clone());
2086                    no_px.insert(*peer_id);
2087                    to_remove_peers.push(*peer_id);
2088                }
2089            }
2090
2091            if let Some(m) = self.metrics.as_mut() {
2092                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2093            }
2094
2095            for peer_id in to_remove_peers {
2096                peers.remove(&peer_id);
2097            }
2098
2099            // too little peers - add some
2100            if peers.len() < self.config.mesh_n_low() {
2101                tracing::debug!(
2102                    topic=%topic_hash,
2103                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2104                    peers.len(),
2105                    self.config.mesh_n_low()
2106                );
2107                // not enough peers - get mesh_n - current_length more
2108                let desired_peers = self.config.mesh_n() - peers.len();
2109                let peer_list =
2110                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2111                        !peers.contains(peer)
2112                            && !explicit_peers.contains(peer)
2113                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2114                            && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2115                    });
2116                for peer in &peer_list {
2117                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2118                    current_topic.push(topic_hash.clone());
2119                }
2120                // update the mesh
2121                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2122                if let Some(m) = self.metrics.as_mut() {
2123                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2124                }
2125                peers.extend(peer_list);
2126            }
2127
2128            // too many peers - remove some
2129            if peers.len() > self.config.mesh_n_high() {
2130                tracing::debug!(
2131                    topic=%topic_hash,
2132                    "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2133                    peers.len(),
2134                    self.config.mesh_n_high()
2135                );
2136                let excess_peer_no = peers.len() - self.config.mesh_n();
2137
2138                // shuffle the peers and then sort by score ascending beginning with the worst
2139                let mut rng = thread_rng();
2140                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2141                shuffled.shuffle(&mut rng);
2142                shuffled.sort_by(|p1, p2| {
2143                    let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2144                    let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2145
2146                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2147                });
2148                // shuffle everything except the last retain_scores many peers (the best ones)
2149                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2150
2151                // count total number of outbound peers
2152                let mut outbound = {
2153                    let outbound_peers = &self.outbound_peers;
2154                    shuffled
2155                        .iter()
2156                        .filter(|p| outbound_peers.contains(*p))
2157                        .count()
2158                };
2159
2160                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2161                // them to to_prune
2162                let mut removed = 0;
2163                for peer in shuffled {
2164                    if removed == excess_peer_no {
2165                        break;
2166                    }
2167                    if self.outbound_peers.contains(&peer) {
2168                        if outbound <= self.config.mesh_outbound_min() {
2169                            // do not remove anymore outbound peers
2170                            continue;
2171                        }
2172                        // an outbound peer gets removed
2173                        outbound -= 1;
2174                    }
2175
2176                    // remove the peer
2177                    peers.remove(&peer);
2178                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2179                    current_topic.push(topic_hash.clone());
2180                    removed += 1;
2181                }
2182
2183                if let Some(m) = self.metrics.as_mut() {
2184                    m.peers_removed(topic_hash, Churn::Excess, removed)
2185                }
2186            }
2187
2188            // do we have enough outbound peers?
2189            if peers.len() >= self.config.mesh_n_low() {
2190                // count number of outbound peers we have
2191                let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2192
2193                // if we have not enough outbound peers, graft to some new outbound peers
2194                if outbound < self.config.mesh_outbound_min() {
2195                    let needed = self.config.mesh_outbound_min() - outbound;
2196                    let peer_list =
2197                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2198                            !peers.contains(peer)
2199                                && !explicit_peers.contains(peer)
2200                                && !backoffs.is_backoff_with_slack(topic_hash, peer)
2201                                && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2202                                && outbound_peers.contains(peer)
2203                        });
2204                    for peer in &peer_list {
2205                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2206                        current_topic.push(topic_hash.clone());
2207                    }
2208                    // update the mesh
2209                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2210                    if let Some(m) = self.metrics.as_mut() {
2211                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2212                    }
2213                    peers.extend(peer_list);
2214                }
2215            }
2216
2217            // should we try to improve the mesh with opportunistic grafting?
2218            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2219                && peers.len() > 1
2220                && self.peer_score.is_some()
2221            {
2222                if let Some((_, thresholds, _, _)) = &self.peer_score {
2223                    // Opportunistic grafting works as follows: we check the median score of peers
2224                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2225                    // select a few peers at random with score over the median.
2226                    // The intention is to (slowly) improve an underperforming mesh by introducing
2227                    // good scoring peers that may have been gossiping at us. This allows us to
2228                    // get out of sticky situations where we are stuck with poor peers and also
2229                    // recover from churn of good peers.
2230
2231                    // now compute the median peer score in the mesh
2232                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2233                    peers_by_score.sort_by(|p1, p2| {
2234                        let p1_score = *scores.get(p1).unwrap_or(&0.0);
2235                        let p2_score = *scores.get(p2).unwrap_or(&0.0);
2236                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2237                    });
2238
2239                    let middle = peers_by_score.len() / 2;
2240                    let median = if peers_by_score.len() % 2 == 0 {
2241                        let sub_middle_peer = *peers_by_score
2242                            .get(middle - 1)
2243                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2244                        let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2245                        let middle_peer =
2246                            *peers_by_score.get(middle).expect("middle < vector length");
2247                        let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2248
2249                        (sub_middle_score + middle_score) * 0.5
2250                    } else {
2251                        *scores
2252                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2253                            .unwrap_or(&0.0)
2254                    };
2255
2256                    // if the median score is below the threshold, select a better peer (if any) and
2257                    // GRAFT
2258                    if median < thresholds.opportunistic_graft_threshold {
2259                        let peer_list = get_random_peers(
2260                            &self.connected_peers,
2261                            topic_hash,
2262                            self.config.opportunistic_graft_peers(),
2263                            |peer_id| {
2264                                !peers.contains(peer_id)
2265                                    && !explicit_peers.contains(peer_id)
2266                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2267                                    && *scores.get(peer_id).unwrap_or(&0.0) > median
2268                            },
2269                        );
2270                        for peer in &peer_list {
2271                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2272                            current_topic.push(topic_hash.clone());
2273                        }
2274                        // update the mesh
2275                        tracing::debug!(
2276                            topic=%topic_hash,
2277                            "Opportunistically graft in topic with peers {:?}",
2278                            peer_list
2279                        );
2280                        if let Some(m) = self.metrics.as_mut() {
2281                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2282                        }
2283                        peers.extend(peer_list);
2284                    }
2285                }
2286            }
2287            // Register the final count of peers in the mesh
2288            if let Some(m) = self.metrics.as_mut() {
2289                m.set_mesh_peers(topic_hash, peers.len())
2290            }
2291        }
2292
2293        // remove expired fanout topics
2294        {
2295            let fanout = &mut self.fanout; // help the borrow checker
2296            let fanout_ttl = self.config.fanout_ttl();
2297            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2298                if *last_pub_time + fanout_ttl < Instant::now() {
2299                    tracing::debug!(
2300                        topic=%topic_hash,
2301                        "HEARTBEAT: Fanout topic removed due to timeout"
2302                    );
2303                    fanout.remove(topic_hash);
2304                    return false;
2305                }
2306                true
2307            });
2308        }
2309
2310        // maintain fanout
2311        // check if our peers are still a part of the topic
2312        for (topic_hash, peers) in self.fanout.iter_mut() {
2313            let mut to_remove_peers = Vec::new();
2314            let publish_threshold = match &self.peer_score {
2315                Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2316                _ => 0.0,
2317            };
2318            for peer_id in peers.iter() {
2319                // is the peer still subscribed to the topic?
2320                let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2321                match self.connected_peers.get(peer_id) {
2322                    Some(peer) => {
2323                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2324                            tracing::debug!(
2325                                topic=%topic_hash,
2326                                "HEARTBEAT: Peer removed from fanout for topic"
2327                            );
2328                            to_remove_peers.push(*peer_id);
2329                        }
2330                    }
2331                    None => {
2332                        // remove if the peer has disconnected
2333                        to_remove_peers.push(*peer_id);
2334                    }
2335                }
2336            }
2337            for to_remove in to_remove_peers {
2338                peers.remove(&to_remove);
2339            }
2340
2341            // not enough peers
2342            if peers.len() < self.config.mesh_n() {
2343                tracing::debug!(
2344                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2345                    peers.len(),
2346                    self.config.mesh_n()
2347                );
2348                let needed_peers = self.config.mesh_n() - peers.len();
2349                let explicit_peers = &self.explicit_peers;
2350                let new_peers =
2351                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2352                        !peers.contains(peer_id)
2353                            && !explicit_peers.contains(peer_id)
2354                            && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2355                    });
2356                peers.extend(new_peers);
2357            }
2358        }
2359
2360        if self.peer_score.is_some() {
2361            tracing::trace!("Mesh message deliveries: {:?}", {
2362                self.mesh
2363                    .iter()
2364                    .map(|(t, peers)| {
2365                        (
2366                            t.clone(),
2367                            peers
2368                                .iter()
2369                                .map(|p| {
2370                                    (
2371                                        *p,
2372                                        self.peer_score
2373                                            .as_ref()
2374                                            .expect("peer_score.is_some()")
2375                                            .0
2376                                            .mesh_message_deliveries(p, t)
2377                                            .unwrap_or(0.0),
2378                                    )
2379                                })
2380                                .collect::<HashMap<PeerId, f64>>(),
2381                        )
2382                    })
2383                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2384            })
2385        }
2386
2387        self.emit_gossip();
2388
2389        // send graft/prunes
2390        if !to_graft.is_empty() | !to_prune.is_empty() {
2391            self.send_graft_prune(to_graft, to_prune, no_px);
2392        }
2393
2394        // shift the memcache
2395        self.mcache.shift();
2396
2397        // Report expired messages
2398        for (peer_id, failed_messages) in self.failed_messages.drain() {
2399            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2400            self.events
2401                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2402                    peer_id,
2403                    failed_messages,
2404                }));
2405        }
2406        self.failed_messages.shrink_to_fit();
2407
2408        tracing::debug!("Completed Heartbeat");
2409        if let Some(metrics) = self.metrics.as_mut() {
2410            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2411            metrics.observe_heartbeat_duration(duration);
2412        }
2413    }
2414
2415    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2416    /// and fanout peers
2417    fn emit_gossip(&mut self) {
2418        let mut rng = thread_rng();
2419        let mut messages = Vec::new();
2420        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2421            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2422            if message_ids.is_empty() {
2423                continue;
2424            }
2425
2426            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2427            if message_ids.len() > self.config.max_ihave_length() {
2428                // we do the truncation (with shuffling) per peer below
2429                tracing::debug!(
2430                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2431                    message_ids.len()
2432                );
2433            } else {
2434                // shuffle to emit in random order
2435                message_ids.shuffle(&mut rng);
2436            }
2437
2438            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2439            let n_map = |m| {
2440                max(
2441                    self.config.gossip_lazy(),
2442                    (self.config.gossip_factor() * m as f64) as usize,
2443                )
2444            };
2445            // get gossip_lazy random peers
2446            let to_msg_peers =
2447                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2448                    !peers.contains(peer)
2449                        && !self.explicit_peers.contains(peer)
2450                        && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2451                });
2452
2453            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2454
2455            for peer_id in to_msg_peers {
2456                let mut peer_message_ids = message_ids.clone();
2457
2458                if peer_message_ids.len() > self.config.max_ihave_length() {
2459                    // We do this per peer so that we emit a different set for each peer.
2460                    // we have enough redundancy in the system that this will significantly increase
2461                    // the message coverage when we do truncate.
2462                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2463                    peer_message_ids.truncate(self.config.max_ihave_length());
2464                }
2465
2466                // send an IHAVE message
2467                messages.push((
2468                    peer_id,
2469                    RpcOut::IHave(IHave {
2470                        topic_hash: topic_hash.clone(),
2471                        message_ids: peer_message_ids,
2472                    }),
2473                ));
2474            }
2475        }
2476        for (peer_id, message) in messages {
2477            self.send_message(peer_id, message);
2478        }
2479    }
2480
2481    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2482    /// messages.
2483    fn send_graft_prune(
2484        &mut self,
2485        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2486        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2487        no_px: HashSet<PeerId>,
2488    ) {
2489        // handle the grafts and overlapping prunes per peer
2490        for (peer_id, topics) in to_graft.into_iter() {
2491            for topic in &topics {
2492                // inform scoring of graft
2493                if let Some((peer_score, ..)) = &mut self.peer_score {
2494                    peer_score.graft(&peer_id, topic.clone());
2495                }
2496
2497                // inform the handler of the peer being added to the mesh
2498                // If the peer did not previously exist in any mesh, inform the handler
2499                peer_added_to_mesh(
2500                    peer_id,
2501                    vec![topic],
2502                    &self.mesh,
2503                    &mut self.events,
2504                    &self.connected_peers,
2505                );
2506            }
2507            let rpc_msgs = topics.iter().map(|topic_hash| {
2508                RpcOut::Graft(Graft {
2509                    topic_hash: topic_hash.clone(),
2510                })
2511            });
2512
2513            // If there are prunes associated with the same peer add them.
2514            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2515            // It therefore must be in at least one mesh and we do not need to inform the handler
2516            // of its removal from another.
2517
2518            // The following prunes are not due to unsubscribing.
2519            let prune_msgs = to_prune
2520                .remove(&peer_id)
2521                .into_iter()
2522                .flatten()
2523                .map(|topic_hash| {
2524                    let prune = self.make_prune(
2525                        &topic_hash,
2526                        &peer_id,
2527                        self.config.do_px() && !no_px.contains(&peer_id),
2528                        false,
2529                    );
2530                    RpcOut::Prune(prune)
2531                });
2532
2533            // send the rpc messages
2534            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2535                self.send_message(peer_id, msg);
2536            }
2537        }
2538
2539        // handle the remaining prunes
2540        // The following prunes are not due to unsubscribing.
2541        for (peer_id, topics) in to_prune.iter() {
2542            for topic_hash in topics {
2543                let prune = self.make_prune(
2544                    topic_hash,
2545                    peer_id,
2546                    self.config.do_px() && !no_px.contains(peer_id),
2547                    false,
2548                );
2549                self.send_message(*peer_id, RpcOut::Prune(prune));
2550
2551                // inform the handler
2552                peer_removed_from_mesh(
2553                    *peer_id,
2554                    topic_hash,
2555                    &self.mesh,
2556                    &mut self.events,
2557                    &self.connected_peers,
2558                );
2559            }
2560        }
2561    }
2562
2563    /// Helper function which forwards a message to mesh\[topic\] peers.
2564    ///
2565    /// Returns true if at least one peer was messaged.
2566    fn forward_msg(
2567        &mut self,
2568        msg_id: &MessageId,
2569        message: RawMessage,
2570        propagation_source: Option<&PeerId>,
2571        originating_peers: HashSet<PeerId>,
2572    ) -> bool {
2573        // message is fully validated inform peer_score
2574        if let Some((peer_score, ..)) = &mut self.peer_score {
2575            if let Some(peer) = propagation_source {
2576                peer_score.deliver_message(peer, msg_id, &message.topic);
2577            }
2578        }
2579
2580        tracing::debug!(message=%msg_id, "Forwarding message");
2581        let mut recipient_peers = HashSet::new();
2582
2583        // Populate the recipient peers mapping
2584
2585        // Add explicit peers
2586        for peer_id in &self.explicit_peers {
2587            let Some(peer) = self.connected_peers.get(peer_id) else {
2588                continue;
2589            };
2590            if Some(peer_id) != propagation_source
2591                && !originating_peers.contains(peer_id)
2592                && Some(peer_id) != message.source.as_ref()
2593                && peer.topics.contains(&message.topic)
2594            {
2595                recipient_peers.insert(*peer_id);
2596            }
2597        }
2598
2599        // add mesh peers
2600        let topic = &message.topic;
2601        // mesh
2602        if let Some(mesh_peers) = self.mesh.get(topic) {
2603            for peer_id in mesh_peers {
2604                if Some(peer_id) != propagation_source
2605                    && !originating_peers.contains(peer_id)
2606                    && Some(peer_id) != message.source.as_ref()
2607                {
2608                    recipient_peers.insert(*peer_id);
2609                }
2610            }
2611        }
2612
2613        if recipient_peers.is_empty() {
2614            return false;
2615        }
2616
2617        // forward the message to peers
2618        for peer in recipient_peers.iter() {
2619            let event = RpcOut::Forward {
2620                message: message.clone(),
2621                timeout: Delay::new(self.config.forward_queue_duration()),
2622            };
2623            tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
2624            self.send_message(*peer, event);
2625        }
2626        tracing::debug!("Completed forwarding message");
2627        true
2628    }
2629
2630    /// Constructs a [`RawMessage`] performing message signing if required.
2631    pub(crate) fn build_raw_message(
2632        &mut self,
2633        topic: TopicHash,
2634        data: Vec<u8>,
2635    ) -> Result<RawMessage, PublishError> {
2636        match &mut self.publish_config {
2637            PublishConfig::Signing {
2638                ref keypair,
2639                author,
2640                inline_key,
2641                last_seq_no,
2642            } => {
2643                let sequence_number = last_seq_no.next();
2644
2645                let signature = {
2646                    let message = proto::Message {
2647                        from: Some(author.to_bytes()),
2648                        data: Some(data.clone()),
2649                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2650                        topic: topic.clone().into_string(),
2651                        signature: None,
2652                        key: None,
2653                    };
2654
2655                    let mut buf = Vec::with_capacity(message.get_size());
2656                    let mut writer = Writer::new(&mut buf);
2657
2658                    message
2659                        .write_message(&mut writer)
2660                        .expect("Encoding to succeed");
2661
2662                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2663                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2664                    signature_bytes.extend_from_slice(&buf);
2665                    Some(keypair.sign(&signature_bytes)?)
2666                };
2667
2668                Ok(RawMessage {
2669                    source: Some(*author),
2670                    data,
2671                    // To be interoperable with the go-implementation this is treated as a 64-bit
2672                    // big-endian uint.
2673                    sequence_number: Some(sequence_number),
2674                    topic,
2675                    signature,
2676                    key: inline_key.clone(),
2677                    validated: true, // all published messages are valid
2678                })
2679            }
2680            PublishConfig::Author(peer_id) => {
2681                Ok(RawMessage {
2682                    source: Some(*peer_id),
2683                    data,
2684                    // To be interoperable with the go-implementation this is treated as a 64-bit
2685                    // big-endian uint.
2686                    sequence_number: Some(rand::random()),
2687                    topic,
2688                    signature: None,
2689                    key: None,
2690                    validated: true, // all published messages are valid
2691                })
2692            }
2693            PublishConfig::RandomAuthor => {
2694                Ok(RawMessage {
2695                    source: Some(PeerId::random()),
2696                    data,
2697                    // To be interoperable with the go-implementation this is treated as a 64-bit
2698                    // big-endian uint.
2699                    sequence_number: Some(rand::random()),
2700                    topic,
2701                    signature: None,
2702                    key: None,
2703                    validated: true, // all published messages are valid
2704                })
2705            }
2706            PublishConfig::Anonymous => {
2707                Ok(RawMessage {
2708                    source: None,
2709                    data,
2710                    // To be interoperable with the go-implementation this is treated as a 64-bit
2711                    // big-endian uint.
2712                    sequence_number: None,
2713                    topic,
2714                    signature: None,
2715                    key: None,
2716                    validated: true, // all published messages are valid
2717                })
2718            }
2719        }
2720    }
2721
2722    /// Send a [`RpcOut`] message to a peer.
2723    ///
2724    /// Returns `true` if sending was successful, `false` otherwise.
2725    /// The method will update the peer score and failed message counter if
2726    /// sending the message failed due to the channel to the connection handler being
2727    /// full (which indicates a slow peer).
2728    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2729        if let Some(m) = self.metrics.as_mut() {
2730            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2731                // register bytes sent on the internal metrics.
2732                m.msg_sent(&message.topic, message.raw_protobuf_len());
2733            }
2734        }
2735
2736        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2737            tracing::error!(peer = %peer_id,
2738                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2739            return false;
2740        };
2741
2742        // Try sending the message to the connection handler.
2743        match peer.sender.send_message(rpc) {
2744            Ok(()) => true,
2745            Err(rpc) => {
2746                // Sending failed because the channel is full.
2747                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2748
2749                // Update failed message counter.
2750                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2751                match rpc {
2752                    RpcOut::Publish { .. } => {
2753                        failed_messages.priority += 1;
2754                        failed_messages.publish += 1;
2755                    }
2756                    RpcOut::Forward { .. } => {
2757                        failed_messages.non_priority += 1;
2758                        failed_messages.forward += 1;
2759                    }
2760                    RpcOut::IWant(_) | RpcOut::IHave(_) => {
2761                        failed_messages.non_priority += 1;
2762                    }
2763                    RpcOut::Graft(_)
2764                    | RpcOut::Prune(_)
2765                    | RpcOut::Subscribe(_)
2766                    | RpcOut::Unsubscribe(_) => {
2767                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2768                    }
2769                }
2770
2771                // Update peer score.
2772                if let Some((peer_score, ..)) = &mut self.peer_score {
2773                    peer_score.failed_message_slow_peer(&peer_id);
2774                }
2775
2776                false
2777            }
2778        }
2779    }
2780
2781    fn on_connection_established(
2782        &mut self,
2783        ConnectionEstablished {
2784            peer_id,
2785            endpoint,
2786            other_established,
2787            ..
2788        }: ConnectionEstablished,
2789    ) {
2790        // Diverging from the go implementation we only want to consider a peer as outbound peer
2791        // if its first connection is outbound.
2792
2793        if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2794            // The first connection is outbound and it is not a peer from peer exchange => mark
2795            // it as outbound peer
2796            self.outbound_peers.insert(peer_id);
2797        }
2798
2799        // Add the IP to the peer scoring system
2800        if let Some((peer_score, ..)) = &mut self.peer_score {
2801            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2802                peer_score.add_ip(&peer_id, ip);
2803            } else {
2804                tracing::trace!(
2805                    peer=%peer_id,
2806                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2807                    endpoint
2808                )
2809            }
2810        }
2811
2812        if other_established > 0 {
2813            return; // Not our first connection to this peer, hence nothing to do.
2814        }
2815
2816        if let Some((peer_score, ..)) = &mut self.peer_score {
2817            peer_score.add_peer(peer_id);
2818        }
2819
2820        // Ignore connections from blacklisted peers.
2821        if self.blacklisted_peers.contains(&peer_id) {
2822            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2823            return;
2824        }
2825
2826        tracing::debug!(peer=%peer_id, "New peer connected");
2827        // We need to send our subscriptions to the newly-connected node.
2828        for topic_hash in self.mesh.clone().into_keys() {
2829            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2830        }
2831    }
2832
2833    fn on_connection_closed(
2834        &mut self,
2835        ConnectionClosed {
2836            peer_id,
2837            connection_id,
2838            endpoint,
2839            remaining_established,
2840            ..
2841        }: ConnectionClosed,
2842    ) {
2843        // Remove IP from peer scoring system
2844        if let Some((peer_score, ..)) = &mut self.peer_score {
2845            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2846                peer_score.remove_ip(&peer_id, &ip);
2847            } else {
2848                tracing::trace!(
2849                    peer=%peer_id,
2850                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2851                    endpoint
2852                )
2853            }
2854        }
2855
2856        if remaining_established != 0 {
2857            // Remove the connection from the list
2858            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2859                let index = peer
2860                    .connections
2861                    .iter()
2862                    .position(|v| v == &connection_id)
2863                    .expect("Previously established connection to peer must be present");
2864                peer.connections.remove(index);
2865
2866                // If there are more connections and this peer is in a mesh, inform the first
2867                // connection handler.
2868                if !peer.connections.is_empty() {
2869                    for topic in &peer.topics {
2870                        if let Some(mesh_peers) = self.mesh.get(topic) {
2871                            if mesh_peers.contains(&peer_id) {
2872                                self.events.push_back(ToSwarm::NotifyHandler {
2873                                    peer_id,
2874                                    event: HandlerIn::JoinedMesh,
2875                                    handler: NotifyHandler::One(peer.connections[0]),
2876                                });
2877                                break;
2878                            }
2879                        }
2880                    }
2881                }
2882            }
2883        } else {
2884            // remove from mesh, topic_peers, peer_topic and the fanout
2885            tracing::debug!(peer=%peer_id, "Peer disconnected");
2886            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2887                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2888                return;
2889            };
2890
2891            // remove peer from all mappings
2892            for topic in &connected_peer.topics {
2893                // check the mesh for the topic
2894                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
2895                    // check if the peer is in the mesh and remove it
2896                    if mesh_peers.remove(&peer_id) {
2897                        if let Some(m) = self.metrics.as_mut() {
2898                            m.peers_removed(topic, Churn::Dc, 1);
2899                            m.set_mesh_peers(topic, mesh_peers.len());
2900                        }
2901                    };
2902                }
2903
2904                if let Some(m) = self.metrics.as_mut() {
2905                    m.dec_topic_peers(topic);
2906                }
2907
2908                // remove from fanout
2909                self.fanout
2910                    .get_mut(topic)
2911                    .map(|peers| peers.remove(&peer_id));
2912            }
2913
2914            // Forget px and outbound status for this peer
2915            self.px_peers.remove(&peer_id);
2916            self.outbound_peers.remove(&peer_id);
2917
2918            // If metrics are enabled, register the disconnection of a peer based on its protocol.
2919            if let Some(metrics) = self.metrics.as_mut() {
2920                metrics.peer_protocol_disconnected(connected_peer.kind.clone());
2921            }
2922
2923            self.connected_peers.remove(&peer_id);
2924
2925            if let Some((peer_score, ..)) = &mut self.peer_score {
2926                peer_score.remove_peer(&peer_id);
2927            }
2928        }
2929    }
2930
2931    fn on_address_change(
2932        &mut self,
2933        AddressChange {
2934            peer_id,
2935            old: endpoint_old,
2936            new: endpoint_new,
2937            ..
2938        }: AddressChange,
2939    ) {
2940        // Exchange IP in peer scoring system
2941        if let Some((peer_score, ..)) = &mut self.peer_score {
2942            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
2943                peer_score.remove_ip(&peer_id, &ip);
2944            } else {
2945                tracing::trace!(
2946                    peer=%&peer_id,
2947                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2948                    endpoint_old
2949                )
2950            }
2951            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
2952                peer_score.add_ip(&peer_id, ip);
2953            } else {
2954                tracing::trace!(
2955                    peer=%peer_id,
2956                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2957                    endpoint_new
2958                )
2959            }
2960        }
2961    }
2962}
2963
2964fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2965    addr.iter().find_map(|p| match p {
2966        Ip4(addr) => Some(IpAddr::V4(addr)),
2967        Ip6(addr) => Some(IpAddr::V6(addr)),
2968        _ => None,
2969    })
2970}
2971
2972impl<C, F> NetworkBehaviour for Behaviour<C, F>
2973where
2974    C: Send + 'static + DataTransform,
2975    F: Send + 'static + TopicSubscriptionFilter,
2976{
2977    type ConnectionHandler = Handler;
2978    type ToSwarm = Event;
2979
2980    fn handle_established_inbound_connection(
2981        &mut self,
2982        connection_id: ConnectionId,
2983        peer_id: PeerId,
2984        _: &Multiaddr,
2985        _: &Multiaddr,
2986    ) -> Result<THandler<Self>, ConnectionDenied> {
2987        // By default we assume a peer is only a floodsub peer.
2988        //
2989        // The protocol negotiation occurs once a message is sent/received. Once this happens we
2990        // update the type of peer that this is in order to determine which kind of routing should
2991        // occur.
2992        let connected_peer = self
2993            .connected_peers
2994            .entry(peer_id)
2995            .or_insert(PeerConnections {
2996                kind: PeerKind::Floodsub,
2997                connections: vec![],
2998                sender: Sender::new(self.config.connection_handler_queue_len()),
2999                topics: Default::default(),
3000            });
3001        // Add the new connection
3002        connected_peer.connections.push(connection_id);
3003
3004        Ok(Handler::new(
3005            self.config.protocol_config(),
3006            connected_peer.sender.new_receiver(),
3007        ))
3008    }
3009
3010    fn handle_established_outbound_connection(
3011        &mut self,
3012        connection_id: ConnectionId,
3013        peer_id: PeerId,
3014        _: &Multiaddr,
3015        _: Endpoint,
3016        _: PortUse,
3017    ) -> Result<THandler<Self>, ConnectionDenied> {
3018        let connected_peer = self
3019            .connected_peers
3020            .entry(peer_id)
3021            .or_insert(PeerConnections {
3022                kind: PeerKind::Floodsub,
3023                connections: vec![],
3024                sender: Sender::new(self.config.connection_handler_queue_len()),
3025                topics: Default::default(),
3026            });
3027        // Add the new connection
3028        connected_peer.connections.push(connection_id);
3029
3030        Ok(Handler::new(
3031            self.config.protocol_config(),
3032            connected_peer.sender.new_receiver(),
3033        ))
3034    }
3035
3036    fn on_connection_handler_event(
3037        &mut self,
3038        propagation_source: PeerId,
3039        _connection_id: ConnectionId,
3040        handler_event: THandlerOutEvent<Self>,
3041    ) {
3042        match handler_event {
3043            HandlerEvent::PeerKind(kind) => {
3044                // We have identified the protocol this peer is using
3045
3046                if let Some(metrics) = self.metrics.as_mut() {
3047                    metrics.peer_protocol_connected(kind.clone());
3048                }
3049
3050                if let PeerKind::NotSupported = kind {
3051                    tracing::debug!(
3052                        peer=%propagation_source,
3053                        "Peer does not support gossipsub protocols"
3054                    );
3055                    self.events
3056                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3057                            peer_id: propagation_source,
3058                        }));
3059                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3060                    // Only change the value if the old value is Floodsub (the default set in
3061                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3062                    // All other PeerKind changes are ignored.
3063                    tracing::debug!(
3064                        peer=%propagation_source,
3065                        peer_type=%kind,
3066                        "New peer type found for peer"
3067                    );
3068                    if let PeerKind::Floodsub = conn.kind {
3069                        conn.kind = kind;
3070                    }
3071                }
3072            }
3073            HandlerEvent::MessageDropped(rpc) => {
3074                // Account for this in the scoring logic
3075                if let Some((peer_score, _, _, _)) = &mut self.peer_score {
3076                    peer_score.failed_message_slow_peer(&propagation_source);
3077                }
3078
3079                // Keep track of expired messages for the application layer.
3080                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3081                failed_messages.timeout += 1;
3082                match rpc {
3083                    RpcOut::Publish { .. } => {
3084                        failed_messages.publish += 1;
3085                    }
3086                    RpcOut::Forward { .. } => {
3087                        failed_messages.forward += 1;
3088                    }
3089                    _ => {}
3090                }
3091
3092                // Record metrics on the failure.
3093                if let Some(metrics) = self.metrics.as_mut() {
3094                    match rpc {
3095                        RpcOut::Publish { message, .. } => {
3096                            metrics.publish_msg_dropped(&message.topic);
3097                            metrics.timeout_msg_dropped(&message.topic);
3098                        }
3099                        RpcOut::Forward { message, .. } => {
3100                            metrics.forward_msg_dropped(&message.topic);
3101                            metrics.timeout_msg_dropped(&message.topic);
3102                        }
3103                        _ => {}
3104                    }
3105                }
3106            }
3107            HandlerEvent::Message {
3108                rpc,
3109                invalid_messages,
3110            } => {
3111                // Handle the gossipsub RPC
3112
3113                // Handle subscriptions
3114                // Update connected peers topics
3115                if !rpc.subscriptions.is_empty() {
3116                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3117                }
3118
3119                // Check if peer is graylisted in which case we ignore the event
3120                if let (true, _) =
3121                    self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3122                {
3123                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3124                    return;
3125                }
3126
3127                // Handle any invalid messages from this peer
3128                if self.peer_score.is_some() {
3129                    for (raw_message, validation_error) in invalid_messages {
3130                        self.handle_invalid_message(
3131                            &propagation_source,
3132                            &raw_message,
3133                            RejectReason::ValidationError(validation_error),
3134                        )
3135                    }
3136                } else {
3137                    // log the invalid messages
3138                    for (message, validation_error) in invalid_messages {
3139                        tracing::warn!(
3140                            peer=%propagation_source,
3141                            source=?message.source,
3142                            "Invalid message from peer. Reason: {:?}",
3143                            validation_error,
3144                        );
3145                    }
3146                }
3147
3148                // Handle messages
3149                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3150                    // Only process the amount of messages the configuration allows.
3151                    if self.config.max_messages_per_rpc().is_some()
3152                        && Some(count) >= self.config.max_messages_per_rpc()
3153                    {
3154                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3155                        break;
3156                    }
3157                    self.handle_received_message(raw_message, &propagation_source);
3158                }
3159
3160                // Handle control messages
3161                // group some control messages, this minimises SendEvents (code is simplified to
3162                // handle each event at a time however)
3163                let mut ihave_msgs = vec![];
3164                let mut graft_msgs = vec![];
3165                let mut prune_msgs = vec![];
3166                for control_msg in rpc.control_msgs {
3167                    match control_msg {
3168                        ControlAction::IHave(IHave {
3169                            topic_hash,
3170                            message_ids,
3171                        }) => {
3172                            ihave_msgs.push((topic_hash, message_ids));
3173                        }
3174                        ControlAction::IWant(IWant { message_ids }) => {
3175                            self.handle_iwant(&propagation_source, message_ids)
3176                        }
3177                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3178                        ControlAction::Prune(Prune {
3179                            topic_hash,
3180                            peers,
3181                            backoff,
3182                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3183                    }
3184                }
3185                if !ihave_msgs.is_empty() {
3186                    self.handle_ihave(&propagation_source, ihave_msgs);
3187                }
3188                if !graft_msgs.is_empty() {
3189                    self.handle_graft(&propagation_source, graft_msgs);
3190                }
3191                if !prune_msgs.is_empty() {
3192                    self.handle_prune(&propagation_source, prune_msgs);
3193                }
3194            }
3195        }
3196    }
3197
3198    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3199    fn poll(
3200        &mut self,
3201        cx: &mut Context<'_>,
3202    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3203        if let Some(event) = self.events.pop_front() {
3204            return Poll::Ready(event);
3205        }
3206
3207        // update scores
3208        if let Some((peer_score, _, delay, _)) = &mut self.peer_score {
3209            if delay.poll_unpin(cx).is_ready() {
3210                peer_score.refresh_scores();
3211                delay.reset(peer_score.params.decay_interval);
3212            }
3213        }
3214
3215        if self.heartbeat.poll_unpin(cx).is_ready() {
3216            self.heartbeat();
3217            self.heartbeat.reset(self.config.heartbeat_interval());
3218        }
3219
3220        Poll::Pending
3221    }
3222
3223    fn on_swarm_event(&mut self, event: FromSwarm) {
3224        match event {
3225            FromSwarm::ConnectionEstablished(connection_established) => {
3226                self.on_connection_established(connection_established)
3227            }
3228            FromSwarm::ConnectionClosed(connection_closed) => {
3229                self.on_connection_closed(connection_closed)
3230            }
3231            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3232            _ => {}
3233        }
3234    }
3235}
3236
3237/// This is called when peers are added to any mesh. It checks if the peer existed
3238/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3239/// the appropriate connection handler to maintain a connection.
3240fn peer_added_to_mesh(
3241    peer_id: PeerId,
3242    new_topics: Vec<&TopicHash>,
3243    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3244    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3245    connections: &HashMap<PeerId, PeerConnections>,
3246) {
3247    // Ensure there is an active connection
3248    let connection_id = match connections.get(&peer_id) {
3249        Some(p) => p
3250            .connections
3251            .first()
3252            .expect("There should be at least one connection to a peer."),
3253        None => {
3254            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3255            return;
3256        }
3257    };
3258
3259    if let Some(peer) = connections.get(&peer_id) {
3260        for topic in &peer.topics {
3261            if !new_topics.contains(&topic) {
3262                if let Some(mesh_peers) = mesh.get(topic) {
3263                    if mesh_peers.contains(&peer_id) {
3264                        // the peer is already in a mesh for another topic
3265                        return;
3266                    }
3267                }
3268            }
3269        }
3270    }
3271    // This is the first mesh the peer has joined, inform the handler
3272    events.push_back(ToSwarm::NotifyHandler {
3273        peer_id,
3274        event: HandlerIn::JoinedMesh,
3275        handler: NotifyHandler::One(*connection_id),
3276    });
3277}
3278
3279/// This is called when peers are removed from a mesh. It checks if the peer exists
3280/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3281/// notify the handler to no longer maintain a connection.
3282fn peer_removed_from_mesh(
3283    peer_id: PeerId,
3284    old_topic: &TopicHash,
3285    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3286    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3287    connections: &HashMap<PeerId, PeerConnections>,
3288) {
3289    // Ensure there is an active connection
3290    let connection_id = match connections.get(&peer_id) {
3291        Some(p) => p
3292            .connections
3293            .first()
3294            .expect("There should be at least one connection to a peer."),
3295        None => {
3296            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3297            return;
3298        }
3299    };
3300
3301    if let Some(peer) = connections.get(&peer_id) {
3302        for topic in &peer.topics {
3303            if topic != old_topic {
3304                if let Some(mesh_peers) = mesh.get(topic) {
3305                    if mesh_peers.contains(&peer_id) {
3306                        // the peer exists in another mesh still
3307                        return;
3308                    }
3309                }
3310            }
3311        }
3312    }
3313    // The peer is not in any other mesh, inform the handler
3314    events.push_back(ToSwarm::NotifyHandler {
3315        peer_id,
3316        event: HandlerIn::LeftMesh,
3317        handler: NotifyHandler::One(*connection_id),
3318    });
3319}
3320
3321/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3322/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3323/// that gets as input the number of filtered peers.
3324fn get_random_peers_dynamic(
3325    connected_peers: &HashMap<PeerId, PeerConnections>,
3326    topic_hash: &TopicHash,
3327    // maps the number of total peers to the number of selected peers
3328    n_map: impl Fn(usize) -> usize,
3329    mut f: impl FnMut(&PeerId) -> bool,
3330) -> BTreeSet<PeerId> {
3331    let mut gossip_peers = connected_peers
3332        .iter()
3333        .filter(|(_, p)| p.topics.contains(topic_hash))
3334        .filter(|(peer_id, _)| f(peer_id))
3335        .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1)
3336        .map(|(peer_id, _)| *peer_id)
3337        .collect::<Vec<PeerId>>();
3338
3339    // if we have less than needed, return them
3340    let n = n_map(gossip_peers.len());
3341    if gossip_peers.len() <= n {
3342        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3343        return gossip_peers.into_iter().collect();
3344    }
3345
3346    // we have more peers than needed, shuffle them and return n of them
3347    let mut rng = thread_rng();
3348    gossip_peers.partial_shuffle(&mut rng, n);
3349
3350    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3351
3352    gossip_peers.into_iter().take(n).collect()
3353}
3354
3355/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3356/// filtered by the function `f`.
3357fn get_random_peers(
3358    connected_peers: &HashMap<PeerId, PeerConnections>,
3359    topic_hash: &TopicHash,
3360    n: usize,
3361    f: impl FnMut(&PeerId) -> bool,
3362) -> BTreeSet<PeerId> {
3363    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3364}
3365
3366/// Validates the combination of signing, privacy and message validation to ensure the
3367/// configuration will not reject published messages.
3368fn validate_config(
3369    authenticity: &MessageAuthenticity,
3370    validation_mode: &ValidationMode,
3371) -> Result<(), &'static str> {
3372    match validation_mode {
3373        ValidationMode::Anonymous => {
3374            if authenticity.is_signing() {
3375                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3376            }
3377
3378            if !authenticity.is_anonymous() {
3379                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3380            }
3381        }
3382        ValidationMode::Strict => {
3383            if !authenticity.is_signing() {
3384                return Err(
3385                    "Messages will be
3386                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3387                the validation or privacy settings in the config"
3388                );
3389            }
3390        }
3391        _ => {}
3392    }
3393    Ok(())
3394}
3395
3396impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3398        f.debug_struct("Behaviour")
3399            .field("config", &self.config)
3400            .field("events", &self.events.len())
3401            .field("publish_config", &self.publish_config)
3402            .field("mesh", &self.mesh)
3403            .field("fanout", &self.fanout)
3404            .field("fanout_last_pub", &self.fanout_last_pub)
3405            .field("mcache", &self.mcache)
3406            .field("heartbeat", &self.heartbeat)
3407            .finish()
3408    }
3409}
3410
3411impl fmt::Debug for PublishConfig {
3412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3413        match self {
3414            PublishConfig::Signing { author, .. } => {
3415                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3416            }
3417            PublishConfig::Author(author) => {
3418                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3419            }
3420            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3421            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3422        }
3423    }
3424}