libp2p_gossipsub/
behaviour.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use std::{
22    cmp::{
23        max,
24        Ordering::{self, Equal},
25    },
26    collections::{BTreeSet, HashMap, HashSet, VecDeque},
27    fmt::{self, Debug},
28    net::IpAddr,
29    task::{Context, Poll},
30    time::Duration,
31};
32
33use futures::FutureExt;
34use futures_timer::Delay;
35use hashlink::LinkedHashMap;
36use libp2p_core::{
37    multiaddr::Protocol::{Ip4, Ip6},
38    transport::PortUse,
39    Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43    behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44    dial_opts::DialOpts,
45    ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46    THandlerOutEvent, ToSwarm,
47};
48#[cfg(feature = "metrics")]
49use prometheus_client::registry::Registry;
50use quick_protobuf::{MessageWrite, Writer};
51use rand::{seq::SliceRandom, thread_rng};
52use web_time::{Instant, SystemTime};
53
54#[cfg(feature = "metrics")]
55use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
56use crate::{
57    backoff::BackoffStorage,
58    config::{Config, ValidationMode},
59    gossip_promises::GossipPromises,
60    handler::{Handler, HandlerEvent, HandlerIn},
61    mcache::MessageCache,
62    peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
63    protocol::SIGNING_PREFIX,
64    rpc::Sender,
65    rpc_proto::proto,
66    subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
67    time_cache::DuplicateCache,
68    topic::{Hasher, Topic, TopicHash},
69    transform::{DataTransform, IdentityTransform},
70    types::{
71        ControlAction, Graft, IDontWant, IHave, IWant, Message, MessageAcceptance, MessageId,
72        PeerDetails, PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription,
73        SubscriptionAction,
74    },
75    FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
76};
77
78#[cfg(test)]
79mod tests;
80
81/// IDONTWANT cache capacity.
82const IDONTWANT_CAP: usize = 10_000;
83
84/// IDONTWANT timeout before removal.
85const IDONTWANT_TIMEOUT: Duration = Duration::new(3, 0);
86
87/// Determines if published messages should be signed or not.
88///
89/// Without signing, a number of privacy preserving modes can be selected.
90///
91/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
92/// should be updated in the [`Config`] to allow for unsigned messages.
93#[derive(Clone)]
94pub enum MessageAuthenticity {
95    /// Message signing is enabled. The author will be the owner of the key and the sequence number
96    /// will be linearly increasing.
97    Signed(Keypair),
98    /// Message signing is disabled.
99    ///
100    /// The specified [`PeerId`] will be used as the author of all published messages. The sequence
101    /// number will be randomized.
102    Author(PeerId),
103    /// Message signing is disabled.
104    ///
105    /// A random [`PeerId`] will be used when publishing each message. The sequence number will be
106    /// randomized.
107    RandomAuthor,
108    /// Message signing is disabled.
109    ///
110    /// The author of the message and the sequence numbers are excluded from the message.
111    ///
112    /// NOTE: Excluding these fields may make these messages invalid by other nodes who
113    /// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
114    /// for how to customise this for rust-libp2p gossipsub.  A custom `message_id`
115    /// function will need to be set to prevent all messages from a peer being filtered
116    /// as duplicates.
117    Anonymous,
118}
119
120impl MessageAuthenticity {
121    /// Returns true if signing is enabled.
122    pub fn is_signing(&self) -> bool {
123        matches!(self, MessageAuthenticity::Signed(_))
124    }
125
126    pub fn is_anonymous(&self) -> bool {
127        matches!(self, MessageAuthenticity::Anonymous)
128    }
129}
130
131/// Event that can be emitted by the gossipsub behaviour.
132#[derive(Debug)]
133pub enum Event {
134    /// A message has been received.
135    Message {
136        /// The peer that forwarded us this message.
137        propagation_source: PeerId,
138        /// The [`MessageId`] of the message. This should be referenced by the application when
139        /// validating a message (if required).
140        message_id: MessageId,
141        /// The decompressed message itself.
142        message: Message,
143    },
144    /// A remote subscribed to a topic.
145    Subscribed {
146        /// Remote that has subscribed.
147        peer_id: PeerId,
148        /// The topic it has subscribed to.
149        topic: TopicHash,
150    },
151    /// A remote unsubscribed from a topic.
152    Unsubscribed {
153        /// Remote that has unsubscribed.
154        peer_id: PeerId,
155        /// The topic it has subscribed from.
156        topic: TopicHash,
157    },
158    /// A peer that does not support gossipsub has connected.
159    GossipsubNotSupported { peer_id: PeerId },
160    /// A peer is not able to download messages in time.
161    SlowPeer {
162        /// The peer_id
163        peer_id: PeerId,
164        /// The types and amounts of failed messages that are occurring for this peer.
165        failed_messages: FailedMessages,
166    },
167}
168
169/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`]
170/// for further details.
171#[allow(clippy::large_enum_variant)]
172enum PublishConfig {
173    Signing {
174        keypair: Keypair,
175        author: PeerId,
176        inline_key: Option<Vec<u8>>,
177        last_seq_no: SequenceNumber,
178    },
179    Author(PeerId),
180    RandomAuthor,
181    Anonymous,
182}
183
184/// A strictly linearly increasing sequence number.
185///
186/// We start from the current time as unix timestamp in milliseconds.
187#[derive(Debug)]
188struct SequenceNumber(u64);
189
190impl SequenceNumber {
191    fn new() -> Self {
192        let unix_timestamp = SystemTime::now()
193            .duration_since(SystemTime::UNIX_EPOCH)
194            .expect("time to be linear")
195            .as_nanos();
196
197        Self(unix_timestamp as u64)
198    }
199
200    fn next(&mut self) -> u64 {
201        self.0 = self
202            .0
203            .checked_add(1)
204            .expect("to not exhaust u64 space for sequence numbers");
205
206        self.0
207    }
208}
209
210impl PublishConfig {
211    pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
212        match self {
213            Self::Signing { author, .. } => Some(author),
214            Self::Author(author) => Some(author),
215            _ => None,
216        }
217    }
218}
219
220impl From<MessageAuthenticity> for PublishConfig {
221    fn from(authenticity: MessageAuthenticity) -> Self {
222        match authenticity {
223            MessageAuthenticity::Signed(keypair) => {
224                let public_key = keypair.public();
225                let key_enc = public_key.encode_protobuf();
226                let key = if key_enc.len() <= 42 {
227                    // The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we
228                    // don't include it specifically in the
229                    // [`rpc_proto::proto::Message::key`] field.
230                    None
231                } else {
232                    // Include the protobuf encoding of the public key in the message.
233                    Some(key_enc)
234                };
235
236                PublishConfig::Signing {
237                    keypair,
238                    author: public_key.to_peer_id(),
239                    inline_key: key,
240                    last_seq_no: SequenceNumber::new(),
241                }
242            }
243            MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
244            MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
245            MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
246        }
247    }
248}
249
250/// Network behaviour that handles the gossipsub protocol.
251///
252/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
253/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
254/// appropriate level to accept unsigned messages.
255///
256/// The DataTransform trait allows applications to optionally add extra encoding/decoding
257/// functionality to the underlying messages. This is intended for custom compression algorithms.
258///
259/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
260/// prevent unwanted messages being propagated and evaluated.
261pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
262    /// Configuration providing gossipsub performance parameters.
263    config: Config,
264
265    /// Events that need to be yielded to the outside when polling.
266    events: VecDeque<ToSwarm<Event, HandlerIn>>,
267
268    /// Information used for publishing messages.
269    publish_config: PublishConfig,
270
271    /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
272    /// duplicates from being propagated to the application and on the network.
273    duplicate_cache: DuplicateCache<MessageId>,
274
275    /// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
276    /// the set of [`ConnectionId`]s.
277    connected_peers: HashMap<PeerId, PeerDetails>,
278
279    /// A set of all explicit peers. These are peers that remain connected and we unconditionally
280    /// forward messages to, outside of the scoring system.
281    explicit_peers: HashSet<PeerId>,
282
283    /// A list of peers that have been blacklisted by the user.
284    /// Messages are not sent to and are rejected from these peers.
285    blacklisted_peers: HashSet<PeerId>,
286
287    /// Overlay network of connected peers - Maps topics to connected gossipsub peers.
288    mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
289
290    /// Map of topics to list of peers that we publish to, but don't subscribe to.
291    fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
292
293    /// The last publish time for fanout topics.
294    fanout_last_pub: HashMap<TopicHash, Instant>,
295
296    /// Storage for backoffs
297    backoffs: BackoffStorage,
298
299    /// Message cache for the last few heartbeats.
300    mcache: MessageCache,
301
302    /// Heartbeat interval stream.
303    heartbeat: Delay,
304
305    /// Number of heartbeats since the beginning of time; this allows us to amortize some resource
306    /// clean up -- eg backoff clean up.
307    heartbeat_ticks: u64,
308
309    /// We remember all peers we found through peer exchange, since those peers are not considered
310    /// as safe as randomly discovered outbound peers. This behaviour diverges from the go
311    /// implementation to avoid possible love bombing attacks in PX. When disconnecting peers will
312    /// be removed from this list which may result in a true outbound rediscovery.
313    px_peers: HashSet<PeerId>,
314
315    /// Stores optional peer score data together with thresholds, decay interval and gossip
316    /// promises.
317    peer_score: PeerScoreState,
318
319    /// Counts the number of `IHAVE` received from each peer since the last heartbeat.
320    count_received_ihave: HashMap<PeerId, usize>,
321
322    /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
323    count_sent_iwant: HashMap<PeerId, usize>,
324
325    /// Short term cache for published message ids. This is used for penalizing peers sending
326    /// our own messages back if the messages are anonymous or use a random author.
327    published_message_ids: DuplicateCache<MessageId>,
328
329    /// The filter used to handle message subscriptions.
330    subscription_filter: F,
331
332    /// A general transformation function that can be applied to data received from the wire before
333    /// calculating the message-id and sending to the application. This is designed to allow the
334    /// user to implement arbitrary topic-based compression algorithms.
335    data_transform: D,
336
337    /// Keep track of a set of internal metrics relating to gossipsub.
338    #[cfg(feature = "metrics")]
339    metrics: Option<Metrics>,
340
341    /// Tracks the numbers of failed messages per peer-id.
342    failed_messages: HashMap<PeerId, FailedMessages>,
343
344    /// Tracks recently sent `IWANT` messages and checks if peers respond to them.
345    gossip_promises: GossipPromises,
346}
347
348impl<D, F> Behaviour<D, F>
349where
350    D: DataTransform + Default,
351    F: TopicSubscriptionFilter + Default,
352{
353    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
354    /// [`Config`]. This has no subscription filter and uses no compression.
355    pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
356        Self::new_with_subscription_filter_and_transform(
357            privacy,
358            config,
359            F::default(),
360            D::default(),
361        )
362    }
363}
364
365impl<D, F> Behaviour<D, F>
366where
367    D: DataTransform + Default,
368    F: TopicSubscriptionFilter,
369{
370    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
371    /// [`Config`] and a custom subscription filter.
372    pub fn new_with_subscription_filter(
373        privacy: MessageAuthenticity,
374        config: Config,
375        subscription_filter: F,
376    ) -> Result<Self, &'static str> {
377        Self::new_with_subscription_filter_and_transform(
378            privacy,
379            config,
380            subscription_filter,
381            D::default(),
382        )
383    }
384}
385
386impl<D, F> Behaviour<D, F>
387where
388    D: DataTransform,
389    F: TopicSubscriptionFilter + Default,
390{
391    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
392    /// [`Config`] and a custom data transform.
393    /// Metrics are disabled by default.
394    pub fn new_with_transform(
395        privacy: MessageAuthenticity,
396        config: Config,
397        data_transform: D,
398    ) -> Result<Self, &'static str> {
399        Self::new_with_subscription_filter_and_transform(
400            privacy,
401            config,
402            F::default(),
403            data_transform,
404        )
405    }
406}
407
408impl<D, F> Behaviour<D, F>
409where
410    D: DataTransform,
411    F: TopicSubscriptionFilter,
412{
413    /// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
414    /// [`Config`] and a custom subscription filter and data transform.
415    /// Metrics are disabled by default.
416    pub fn new_with_subscription_filter_and_transform(
417        privacy: MessageAuthenticity,
418        config: Config,
419        subscription_filter: F,
420        data_transform: D,
421    ) -> Result<Self, &'static str> {
422        // Set up the router given the configuration settings.
423
424        // We do not allow configurations where a published message would also be rejected if it
425        // were received locally.
426        validate_config(&privacy, config.validation_mode())?;
427
428        Ok(Behaviour {
429            #[cfg(feature = "metrics")]
430            metrics: None,
431            events: VecDeque::new(),
432            publish_config: privacy.into(),
433            duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
434            explicit_peers: HashSet::new(),
435            blacklisted_peers: HashSet::new(),
436            mesh: HashMap::new(),
437            fanout: HashMap::new(),
438            fanout_last_pub: HashMap::new(),
439            backoffs: BackoffStorage::new(
440                &config.prune_backoff(),
441                config.heartbeat_interval(),
442                config.backoff_slack(),
443            ),
444            mcache: MessageCache::new(config.history_gossip(), config.history_length()),
445            heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
446            heartbeat_ticks: 0,
447            px_peers: HashSet::new(),
448            peer_score: PeerScoreState::Disabled,
449            count_received_ihave: HashMap::new(),
450            count_sent_iwant: HashMap::new(),
451            connected_peers: HashMap::new(),
452            published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
453            config,
454            subscription_filter,
455            data_transform,
456            failed_messages: Default::default(),
457            gossip_promises: Default::default(),
458        })
459    }
460
461    /// Allow the [`Behaviour`] to also record metrics.
462    /// Metrics can be evaluated by passing a reference to a [`Registry`].
463    #[cfg(feature = "metrics")]
464    pub fn with_metrics(
465        mut self,
466        metrics_registry: &mut Registry,
467        metrics_config: MetricsConfig,
468    ) -> Self {
469        self.metrics = Some(Metrics::new(metrics_registry, metrics_config));
470        self
471    }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476    D: DataTransform + Send + 'static,
477    F: TopicSubscriptionFilter + Send + 'static,
478{
479    /// Lists the hashes of the topics we are currently subscribed to.
480    pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481        self.mesh.keys()
482    }
483
484    /// Lists all mesh peers for a certain topic hash.
485    pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486        self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487    }
488
489    pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490        let mut res = BTreeSet::new();
491        for peers in self.mesh.values() {
492            res.extend(peers);
493        }
494        res.into_iter()
495    }
496
497    /// Lists all known peers and their associated subscribed topics.
498    pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499        self.connected_peers
500            .iter()
501            .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502    }
503
504    /// Lists all known peers and their associated protocol.
505    pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506        self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507    }
508
509    /// Returns the gossipsub score for a given peer, if one exists.
510    pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511        match &self.peer_score {
512            PeerScoreState::Active(peer_score) => Some(peer_score.score_report(peer_id).score),
513            PeerScoreState::Disabled => None,
514        }
515    }
516
517    /// Subscribe to a topic.
518    ///
519    /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
520    /// subscribed.
521    pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
522        let topic_hash = topic.hash();
523        if !self.subscription_filter.can_subscribe(&topic_hash) {
524            return Err(SubscriptionError::NotAllowed);
525        }
526
527        if self.mesh.contains_key(&topic_hash) {
528            tracing::debug!(%topic, "Topic is already in the mesh");
529            return Ok(false);
530        }
531
532        // send subscription request to all peers
533        for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
534            tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
535            let event = RpcOut::Subscribe(topic_hash.clone());
536            self.send_message(peer_id, event);
537        }
538
539        // call JOIN(topic)
540        // this will add new peers to the mesh for the topic
541        self.join(&topic_hash);
542        tracing::debug!(%topic, "Subscribed to topic");
543        Ok(true)
544    }
545
546    /// Unsubscribes from a topic.
547    ///
548    /// Returns `true` if we were subscribed to this topic.
549    pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
550        let topic_hash = topic.hash();
551
552        if !self.mesh.contains_key(&topic_hash) {
553            tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
554            // we are not subscribed
555            return false;
556        }
557
558        // announce to all peers
559        for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
560            tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
561            let event = RpcOut::Unsubscribe(topic_hash.clone());
562            self.send_message(peer, event);
563        }
564
565        // call LEAVE(topic)
566        // this will remove the topic from the mesh
567        self.leave(&topic_hash);
568
569        tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
570        true
571    }
572
573    /// Publishes a message with multiple topics to the network.
574    pub fn publish(
575        &mut self,
576        topic: impl Into<TopicHash>,
577        data: impl Into<Vec<u8>>,
578    ) -> Result<MessageId, PublishError> {
579        let data = data.into();
580        let topic = topic.into();
581
582        // Transform the data before building a raw_message.
583        let transformed_data = self
584            .data_transform
585            .outbound_transform(&topic.clone(), data.clone())?;
586
587        let max_transmit_size_for_topic = self
588            .config
589            .protocol_config()
590            .max_transmit_size_for_topic(&topic);
591
592        // check that the size doesn't exceed the max transmission size.
593        if transformed_data.len() > max_transmit_size_for_topic {
594            return Err(PublishError::MessageTooLarge);
595        }
596
597        let mesh_n = self.config.mesh_n_for_topic(&topic);
598        let raw_message = self.build_raw_message(topic, transformed_data)?;
599
600        // calculate the message id from the un-transformed data
601        let msg_id = self.config.message_id(&Message {
602            source: raw_message.source,
603            data, // the uncompressed form
604            sequence_number: raw_message.sequence_number,
605            topic: raw_message.topic.clone(),
606        });
607
608        // Check the if the message has been published before
609        if self.duplicate_cache.contains(&msg_id) {
610            // This message has already been seen. We don't re-publish messages that have already
611            // been published on the network.
612            tracing::warn!(
613                message_id=%msg_id,
614                "Not publishing a message that has already been published"
615            );
616            return Err(PublishError::Duplicate);
617        }
618
619        tracing::trace!(message_id=%msg_id, "Publishing message");
620
621        let topic_hash = raw_message.topic.clone();
622
623        let mut peers_on_topic = self
624            .connected_peers
625            .iter()
626            .filter(|(_, p)| p.topics.contains(&topic_hash))
627            .map(|(peer_id, _)| peer_id)
628            .peekable();
629
630        if peers_on_topic.peek().is_none() {
631            return Err(PublishError::NoPeersSubscribedToTopic);
632        }
633
634        let mut recipient_peers = HashSet::new();
635        if self.config.flood_publish() {
636            // Forward to all peers above score and all explicit peers
637            recipient_peers.extend(peers_on_topic.filter(|p| {
638                self.explicit_peers.contains(*p)
639                    || !self
640                        .peer_score
641                        .below_threshold(p, |ts| ts.publish_threshold)
642                        .0
643            }));
644        } else {
645            match self.mesh.get(&topic_hash) {
646                // Mesh peers
647                Some(mesh_peers) => {
648                    // We have a mesh set. We want to make sure to publish to at least `mesh_n`
649                    // peers (if possible).
650                    let needed_extra_peers = mesh_n.saturating_sub(mesh_peers.len());
651
652                    if needed_extra_peers > 0 {
653                        // We don't have `mesh_n` peers in our mesh, we will randomly select extras
654                        // and publish to them.
655
656                        // Get a random set of peers that are appropriate to send messages too.
657                        let peer_list = get_random_peers(
658                            &self.connected_peers,
659                            &topic_hash,
660                            needed_extra_peers,
661                            |peer| {
662                                !mesh_peers.contains(peer)
663                                    && !self.explicit_peers.contains(peer)
664                                    && !self
665                                        .peer_score
666                                        .below_threshold(peer, |ts| ts.publish_threshold)
667                                        .0
668                            },
669                        );
670                        recipient_peers.extend(peer_list);
671                    }
672
673                    recipient_peers.extend(mesh_peers);
674                }
675                // Gossipsub peers
676                None => {
677                    tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
678                    // `fanout_peers` is always non-empty if it's `Some`.
679                    let fanout_peers = self
680                        .fanout
681                        .get(&topic_hash)
682                        .filter(|peers| !peers.is_empty());
683                    // If we have fanout peers add them to the map.
684                    if let Some(peers) = fanout_peers {
685                        for peer in peers {
686                            recipient_peers.insert(*peer);
687                        }
688                    } else {
689                        // We have no fanout peers, select mesh_n of them and add them to the fanout
690                        let new_peers =
691                            get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
692                                |p| {
693                                    !self.explicit_peers.contains(p)
694                                        && !self
695                                            .peer_score
696                                            .below_threshold(p, |ts| ts.publish_threshold)
697                                            .0
698                                }
699                            });
700                        // Add the new peers to the fanout and recipient peers
701                        self.fanout.insert(topic_hash.clone(), new_peers.clone());
702                        for peer in new_peers {
703                            tracing::debug!(%peer, "Peer added to fanout");
704                            recipient_peers.insert(peer);
705                        }
706                    }
707                    // We are publishing to fanout peers - update the time we published
708                    self.fanout_last_pub
709                        .insert(topic_hash.clone(), Instant::now());
710                }
711            }
712
713            // Explicit peers that are part of the topic
714            recipient_peers
715                .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
716
717            // Floodsub peers
718            for (peer, connections) in &self.connected_peers {
719                if connections.kind == PeerKind::Floodsub
720                    && connections.topics.contains(&topic_hash)
721                    && !self
722                        .peer_score
723                        .below_threshold(peer, |ts| ts.publish_threshold)
724                        .0
725                {
726                    recipient_peers.insert(*peer);
727                }
728            }
729        }
730
731        // If the message isn't a duplicate and we have sent it to some peers add it to the
732        // duplicate cache and memcache.
733        self.duplicate_cache.insert(msg_id.clone());
734        self.mcache.put(&msg_id, raw_message.clone());
735
736        // Consider the message as delivered for gossip promises.
737        self.gossip_promises.message_delivered(&msg_id);
738
739        // If the message is anonymous or has a random author add it to the published message ids
740        // cache.
741        if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
742            if !self.config.allow_self_origin() {
743                self.published_message_ids.insert(msg_id.clone());
744            }
745        }
746
747        // Send to peers we know are subscribed to the topic.
748        let mut publish_failed = true;
749        for peer_id in recipient_peers.iter() {
750            tracing::trace!(peer=%peer_id, "Sending message to peer");
751            // If enabled, Send first an IDONTWANT so that if we are slower than forwarders
752            // publishing the original message we don't receive it back.
753            if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold()
754                && self.config.idontwant_on_publish()
755            {
756                self.send_message(
757                    *peer_id,
758                    RpcOut::IDontWant(IDontWant {
759                        message_ids: vec![msg_id.clone()],
760                    }),
761                );
762            }
763
764            if self.send_message(
765                *peer_id,
766                RpcOut::Publish {
767                    message: raw_message.clone(),
768                    timeout: Delay::new(self.config.publish_queue_duration()),
769                },
770            ) {
771                publish_failed = false
772            }
773        }
774
775        if recipient_peers.is_empty() {
776            return Err(PublishError::NoPeersSubscribedToTopic);
777        }
778
779        if publish_failed {
780            return Err(PublishError::AllQueuesFull(recipient_peers.len()));
781        }
782
783        tracing::debug!(message_id=%msg_id, "Published message");
784
785        #[cfg(feature = "metrics")]
786        if let Some(metrics) = self.metrics.as_mut() {
787            metrics.register_published_message(&topic_hash);
788        }
789
790        Ok(msg_id)
791    }
792
793    /// This function should be called when [`Config::validate_messages()`] is `true` after
794    /// the message got validated by the caller. Messages are stored in the ['Memcache'] and
795    /// validation is expected to be fast enough that the messages should still exist in the cache.
796    /// There are three possible validation outcomes and the outcome is given in acceptance.
797    ///
798    /// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
799    /// network. The `propagation_source` parameter indicates who the message was received by and
800    /// will not be forwarded back to that peer.
801    ///
802    /// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
803    /// and the Pâ‚„ penalty will be applied to the `propagation_source`.
804    //
805    /// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
806    /// but no Pâ‚„ penalty will be applied.
807    ///
808    /// This function will return true if the message was found in the cache and false if was not
809    /// in the cache anymore.
810    ///
811    /// This should only be called once per message.
812    pub fn report_message_validation_result(
813        &mut self,
814        msg_id: &MessageId,
815        propagation_source: &PeerId,
816        acceptance: MessageAcceptance,
817    ) -> bool {
818        let reject_reason = match acceptance {
819            MessageAcceptance::Accept => {
820                let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
821                    Some((raw_message, originating_peers)) => {
822                        (raw_message.clone(), originating_peers)
823                    }
824                    None => {
825                        tracing::warn!(
826                            message_id=%msg_id,
827                            "Message not in cache. Ignoring forwarding"
828                        );
829                        #[cfg(feature = "metrics")]
830                        if let Some(metrics) = self.metrics.as_mut() {
831                            metrics.memcache_miss();
832                        }
833                        return false;
834                    }
835                };
836
837                #[cfg(feature = "metrics")]
838                if let Some(metrics) = self.metrics.as_mut() {
839                    metrics.register_msg_validation(&raw_message.topic, &acceptance);
840                }
841
842                self.forward_msg(
843                    msg_id,
844                    raw_message,
845                    Some(propagation_source),
846                    originating_peers,
847                );
848                return true;
849            }
850            MessageAcceptance::Reject => RejectReason::ValidationFailed,
851            MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
852        };
853
854        if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
855            #[cfg(feature = "metrics")]
856            if let Some(metrics) = self.metrics.as_mut() {
857                metrics.register_msg_validation(&raw_message.topic, &acceptance);
858            }
859
860            // Tell peer_score about reject
861            // Reject the original source, and any duplicates we've seen from other peers.
862            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
863                peer_score.reject_message(
864                    propagation_source,
865                    msg_id,
866                    &raw_message.topic,
867                    reject_reason,
868                );
869                for peer in originating_peers.iter() {
870                    peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
871                }
872            }
873            true
874        } else {
875            tracing::warn!(message_id=%msg_id, "Rejected message not in cache");
876            false
877        }
878    }
879
880    /// Adds a new peer to the list of explicitly connected peers.
881    pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
882        tracing::debug!(peer=%peer_id, "Adding explicit peer");
883
884        self.explicit_peers.insert(*peer_id);
885
886        self.check_explicit_peer_connection(peer_id);
887    }
888
889    /// This removes the peer from explicitly connected peers, note that this does not disconnect
890    /// the peer.
891    pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
892        tracing::debug!(peer=%peer_id, "Removing explicit peer");
893        self.explicit_peers.remove(peer_id);
894    }
895
896    /// Blacklists a peer. All messages from this peer will be rejected and any message that was
897    /// created by this peer will be rejected.
898    pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
899        if self.blacklisted_peers.insert(*peer_id) {
900            tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
901        }
902    }
903
904    /// Removes a peer from the blacklist if it has previously been blacklisted.
905    pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
906        if self.blacklisted_peers.remove(peer_id) {
907            tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
908        }
909    }
910
911    /// Activates the peer scoring system with the given parameters. This will reset all scores
912    /// if there was already another peer scoring system activated. Returns an error if the
913    /// params are not valid or if they got already set.
914    pub fn with_peer_score(
915        &mut self,
916        params: PeerScoreParams,
917        threshold: PeerScoreThresholds,
918    ) -> Result<(), String> {
919        self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
920    }
921
922    /// Activates the peer scoring system with the given parameters and a message delivery time
923    /// callback. Returns an error if the parameters got already set.
924    pub fn with_peer_score_and_message_delivery_time_callback(
925        &mut self,
926        params: PeerScoreParams,
927        thresholds: PeerScoreThresholds,
928        callback: Option<fn(&PeerId, &TopicHash, f64)>,
929    ) -> Result<(), String> {
930        params.validate()?;
931        thresholds.validate()?;
932
933        if let PeerScoreState::Active(_) = self.peer_score {
934            return Err("Peer score set twice".into());
935        }
936
937        let peer_score =
938            PeerScore::new_with_message_delivery_time_callback(params, thresholds, callback);
939        self.peer_score = PeerScoreState::Active(Box::new(peer_score));
940        Ok(())
941    }
942
943    /// Sets scoring parameters for a topic.
944    ///
945    /// The [`Self::with_peer_score()`] must first be called to initialise peer scoring.
946    pub fn set_topic_params<H: Hasher>(
947        &mut self,
948        topic: Topic<H>,
949        params: TopicScoreParams,
950    ) -> Result<(), &'static str> {
951        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
952            peer_score.set_topic_params(topic.hash(), params);
953            Ok(())
954        } else {
955            Err("Peer score must be initialised with `with_peer_score()`")
956        }
957    }
958
959    /// Returns a scoring parameters for a topic if existent.
960    pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
961        match &self.peer_score {
962            PeerScoreState::Active(peer_score) => peer_score.get_topic_params(&topic.hash()),
963            PeerScoreState::Disabled => None,
964        }
965    }
966
967    /// Sets the application specific score for a peer. Returns true if scoring is active and
968    /// the peer is connected or if the score of the peer is not yet expired, false otherwise.
969    pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
970        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
971            peer_score.set_application_score(peer_id, new_score)
972        } else {
973            false
974        }
975    }
976
977    /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages.
978    fn join(&mut self, topic_hash: &TopicHash) {
979        let mut added_peers = HashSet::new();
980        let mesh_n = self.config.mesh_n_for_topic(topic_hash);
981        #[cfg(feature = "metrics")]
982        if let Some(m) = self.metrics.as_mut() {
983            m.joined(topic_hash)
984        }
985
986        // Always construct a mesh regardless if we find peers or not.
987        self.mesh.entry(topic_hash.clone()).or_default();
988
989        // check if we have mesh_n peers in fanout[topic] and add them to the mesh if we do,
990        // removing the fanout entry.
991        if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
992            tracing::debug!(
993                topic=%topic_hash,
994                "JOIN: Removing peers from the fanout for topic"
995            );
996
997            // remove explicit peers, peers with negative scores, and backoffed peers
998            peers.retain(|p| {
999                !self.explicit_peers.contains(p)
1000                    && !self.peer_score.below_threshold(p, |_| 0.0).0
1001                    && !self.backoffs.is_backoff_with_slack(topic_hash, p)
1002            });
1003
1004            // Add up to mesh_n of them to the mesh
1005            // NOTE: These aren't randomly added, currently FIFO
1006            let add_peers = std::cmp::min(peers.len(), mesh_n);
1007            tracing::debug!(
1008                topic=%topic_hash,
1009                "JOIN: Adding {:?} peers from the fanout for topic",
1010                add_peers
1011            );
1012            added_peers.extend(peers.iter().take(add_peers));
1013
1014            self.mesh.insert(
1015                topic_hash.clone(),
1016                peers.into_iter().take(add_peers).collect(),
1017            );
1018
1019            // remove the last published time
1020            self.fanout_last_pub.remove(topic_hash);
1021        }
1022
1023        #[cfg(feature = "metrics")]
1024        if let Some(m) = self.metrics.as_mut() {
1025            m.peers_included(topic_hash, Inclusion::Fanout, added_peers.len())
1026        }
1027
1028        // check if we need to get more peers, which we randomly select
1029        if added_peers.len() < mesh_n {
1030            // get the peers
1031            let random_added = get_random_peers(
1032                &self.connected_peers,
1033                topic_hash,
1034                mesh_n - added_peers.len(),
1035                |peer| {
1036                    !added_peers.contains(peer)
1037                        && !self.explicit_peers.contains(peer)
1038                        && !self.peer_score.below_threshold(peer, |_| 0.0).0
1039                        && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1040                },
1041            );
1042
1043            added_peers.extend(random_added.clone());
1044            // add them to the mesh
1045            tracing::debug!(
1046                "JOIN: Inserting {:?} random peers into the mesh",
1047                random_added.len()
1048            );
1049
1050            #[cfg(feature = "metrics")]
1051            if let Some(m) = self.metrics.as_mut() {
1052                m.peers_included(topic_hash, Inclusion::Random, random_added.len())
1053            }
1054
1055            let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1056            mesh_peers.extend(random_added);
1057        }
1058
1059        for peer_id in added_peers {
1060            // Send a GRAFT control message
1061            tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1062            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1063                peer_score.graft(&peer_id, topic_hash.clone());
1064            }
1065            self.send_message(
1066                peer_id,
1067                RpcOut::Graft(Graft {
1068                    topic_hash: topic_hash.clone(),
1069                }),
1070            );
1071
1072            // If the peer did not previously exist in any mesh, inform the handler
1073            peer_added_to_mesh(
1074                peer_id,
1075                vec![topic_hash],
1076                &self.mesh,
1077                &mut self.events,
1078                &self.connected_peers,
1079            );
1080        }
1081
1082        #[cfg(feature = "metrics")]
1083        {
1084            let mesh_peers = self.mesh_peers(topic_hash).count();
1085            if let Some(m) = self.metrics.as_mut() {
1086                m.set_mesh_peers(topic_hash, mesh_peers)
1087            }
1088        }
1089
1090        tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1091    }
1092
1093    /// Creates a PRUNE gossipsub action.
1094    fn make_prune(
1095        &mut self,
1096        topic_hash: &TopicHash,
1097        peer: &PeerId,
1098        do_px: bool,
1099        on_unsubscribe: bool,
1100    ) -> Prune {
1101        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1102            peer_score.prune(peer, topic_hash.clone());
1103        }
1104
1105        match self.connected_peers.get(peer).map(|v| &v.kind) {
1106            Some(PeerKind::Floodsub) => {
1107                tracing::error!("Attempted to prune a Floodsub peer");
1108            }
1109            Some(PeerKind::Gossipsub) => {
1110                // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
1111                return Prune {
1112                    topic_hash: topic_hash.clone(),
1113                    peers: Vec::new(),
1114                    backoff: None,
1115                };
1116            }
1117            None => {
1118                tracing::error!("Attempted to Prune an unknown peer");
1119            }
1120            _ => {} // Gossipsub 1.1 peer perform the `Prune`
1121        }
1122
1123        // Select peers for peer exchange
1124        let peers = if do_px {
1125            get_random_peers(
1126                &self.connected_peers,
1127                topic_hash,
1128                self.config.prune_peers(),
1129                |p| p != peer && !self.peer_score.below_threshold(p, |_| 0.0).0,
1130            )
1131            .into_iter()
1132            .map(|p| PeerInfo { peer_id: Some(p) })
1133            .collect()
1134        } else {
1135            Vec::new()
1136        };
1137
1138        let backoff = if on_unsubscribe {
1139            self.config.unsubscribe_backoff()
1140        } else {
1141            self.config.prune_backoff()
1142        };
1143
1144        // update backoff
1145        self.backoffs.update_backoff(topic_hash, peer, backoff);
1146
1147        Prune {
1148            topic_hash: topic_hash.clone(),
1149            peers,
1150            backoff: Some(backoff.as_secs()),
1151        }
1152    }
1153
1154    /// Gossipsub LEAVE(topic) - Notifies mesh\[topic\] peers with PRUNE messages.
1155    fn leave(&mut self, topic_hash: &TopicHash) {
1156        tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1157
1158        // If our mesh contains the topic, send prune to peers and delete it from the mesh
1159        if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1160            #[cfg(feature = "metrics")]
1161            if let Some(m) = self.metrics.as_mut() {
1162                m.left(topic_hash)
1163            }
1164            for peer_id in peers {
1165                // Send a PRUNE control message
1166                tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1167
1168                let on_unsubscribe = true;
1169                let prune =
1170                    self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1171                self.send_message(peer_id, RpcOut::Prune(prune));
1172
1173                // If the peer did not previously exist in any mesh, inform the handler
1174                peer_removed_from_mesh(
1175                    peer_id,
1176                    topic_hash,
1177                    &self.mesh,
1178                    &mut self.events,
1179                    &self.connected_peers,
1180                );
1181            }
1182        }
1183        tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1184    }
1185
1186    /// Checks if the given peer is still connected and if not dials the peer again.
1187    fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1188        if !self.connected_peers.contains_key(peer_id) {
1189            // Connect to peer
1190            tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1191            self.events.push_back(ToSwarm::Dial {
1192                opts: DialOpts::peer_id(*peer_id).build(),
1193            });
1194        }
1195    }
1196
1197    /// Handles an IHAVE control message. Checks our cache of messages. If the message is unknown,
1198    /// requests it with an IWANT control message.
1199    fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1200        // We ignore IHAVE gossip from any peer whose score is below the gossip threshold
1201        if let (true, score) = self
1202            .peer_score
1203            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1204        {
1205            tracing::debug!(
1206                peer=%peer_id,
1207                %score,
1208                "IHAVE: ignoring peer with score below threshold"
1209            );
1210            return;
1211        }
1212
1213        // IHAVE flood protection
1214        let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1215        *peer_have += 1;
1216        if *peer_have > self.config.max_ihave_messages() {
1217            tracing::debug!(
1218                peer=%peer_id,
1219                "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1220            interval; ignoring",
1221                *peer_have
1222            );
1223            return;
1224        }
1225
1226        if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1227            if *iasked >= self.config.max_ihave_length() {
1228                tracing::debug!(
1229                    peer=%peer_id,
1230                    "IHAVE: peer has already advertised too many messages ({}); ignoring",
1231                    *iasked
1232                );
1233                return;
1234            }
1235        }
1236
1237        tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1238
1239        let mut iwant_ids = HashSet::new();
1240
1241        for (topic, ids) in ihave_msgs {
1242            // only process the message if we are subscribed
1243            if !self.mesh.contains_key(&topic) {
1244                tracing::debug!(
1245                    %topic,
1246                    "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1247                );
1248                continue;
1249            }
1250
1251            for id in ids.into_iter().filter(|id| {
1252                if self.duplicate_cache.contains(id) {
1253                    return false;
1254                }
1255
1256                !self.gossip_promises.contains(id)
1257            }) {
1258                // have not seen this message and are not currently requesting it
1259                if iwant_ids.insert(id) {
1260                    // Register the IWANT metric
1261                    #[cfg(feature = "metrics")]
1262                    if let Some(metrics) = self.metrics.as_mut() {
1263                        metrics.register_iwant(&topic);
1264                    }
1265                }
1266            }
1267        }
1268
1269        if !iwant_ids.is_empty() {
1270            let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1271            let mut iask = iwant_ids.len();
1272            if *iasked + iask > self.config.max_ihave_length() {
1273                iask = self.config.max_ihave_length().saturating_sub(*iasked);
1274            }
1275
1276            // Send the list of IWANT control messages
1277            tracing::debug!(
1278                peer=%peer_id,
1279                "IHAVE: Asking for {} out of {} messages from peer",
1280                iask,
1281                iwant_ids.len()
1282            );
1283
1284            // Ask in random order
1285            let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1286            let mut rng = thread_rng();
1287            iwant_ids_vec.partial_shuffle(&mut rng, iask);
1288
1289            iwant_ids_vec.truncate(iask);
1290            *iasked += iask;
1291
1292            self.gossip_promises.add_promise(
1293                *peer_id,
1294                &iwant_ids_vec,
1295                Instant::now() + self.config.iwant_followup_time(),
1296            );
1297            tracing::trace!(
1298                peer=%peer_id,
1299                "IHAVE: Asking for the following messages from peer: {:?}",
1300                iwant_ids_vec
1301            );
1302
1303            self.send_message(
1304                *peer_id,
1305                RpcOut::IWant(IWant {
1306                    message_ids: iwant_ids_vec,
1307                }),
1308            );
1309        }
1310        tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1311    }
1312
1313    /// Handles an IWANT control message. Checks our cache of messages. If the message exists it is
1314    /// forwarded to the requesting peer.
1315    fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1316        // We ignore IWANT gossip from any peer whose score is below the gossip threshold
1317        if let (true, score) = self
1318            .peer_score
1319            .below_threshold(peer_id, |ts| ts.gossip_threshold)
1320        {
1321            tracing::debug!(
1322                peer=%peer_id,
1323                "IWANT: ignoring peer with score below threshold [score = {}]",
1324                score
1325            );
1326            return;
1327        }
1328
1329        tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1330
1331        for id in iwant_msgs {
1332            // If we have it and the IHAVE count is not above the threshold,
1333            // forward the message.
1334            if let Some((msg, count)) = self
1335                .mcache
1336                .get_with_iwant_counts(&id, peer_id)
1337                .map(|(msg, count)| (msg.clone(), count))
1338            {
1339                if count > self.config.gossip_retransimission() {
1340                    tracing::debug!(
1341                        peer=%peer_id,
1342                        message_id=%id,
1343                        "IWANT: Peer has asked for message too many times; ignoring request"
1344                    );
1345                } else {
1346                    if let Some(peer) = self.connected_peers.get_mut(peer_id) {
1347                        if peer.dont_send.contains_key(&id) {
1348                            tracing::debug!(%peer_id, message_id=%id, "Peer already sent IDONTWANT for this message");
1349                            continue;
1350                        }
1351                    }
1352
1353                    tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1354                    self.send_message(
1355                        *peer_id,
1356                        RpcOut::Forward {
1357                            message: msg,
1358                            timeout: Delay::new(self.config.forward_queue_duration()),
1359                        },
1360                    );
1361                }
1362            }
1363        }
1364        tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1365    }
1366
1367    /// Handles GRAFT control messages. If subscribed to the topic, adds the peer to mesh, if not,
1368    /// responds with PRUNE messages.
1369    fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1370        tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1371
1372        let mut to_prune_topics = HashSet::new();
1373
1374        let mut do_px = self.config.do_px();
1375
1376        let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1377            tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1378            return;
1379        };
1380        // Needs to be here to comply with the borrow checker.
1381        let is_outbound = connected_peer.outbound;
1382
1383        // For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1384        // and they must be subscribed to the topic. Ensure we have recorded the mapping.
1385        for topic in &topics {
1386            if connected_peer.topics.insert(topic.clone()) {
1387                #[cfg(feature = "metrics")]
1388                if let Some(m) = self.metrics.as_mut() {
1389                    m.inc_topic_peers(topic);
1390                }
1391            }
1392        }
1393
1394        // we don't GRAFT to/from explicit peers; complain loudly if this happens
1395        if self.explicit_peers.contains(peer_id) {
1396            tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1397            // this is possibly a bug from non-reciprocal configuration; send a PRUNE for all topics
1398            to_prune_topics = topics.into_iter().collect();
1399            // but don't PX
1400            do_px = false
1401        } else {
1402            let (below_zero, score) = self.peer_score.below_threshold(peer_id, |_| 0.0);
1403            let now = Instant::now();
1404            for topic_hash in topics {
1405                if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1406                    // if the peer is already in the mesh ignore the graft
1407                    if peers.contains(peer_id) {
1408                        tracing::debug!(
1409                            peer=%peer_id,
1410                            topic=%&topic_hash,
1411                            "GRAFT: Received graft for peer that is already in topic"
1412                        );
1413                        continue;
1414                    }
1415
1416                    // make sure we are not backing off that peer
1417                    if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1418                    {
1419                        if backoff_time > now {
1420                            tracing::warn!(
1421                                peer=%peer_id,
1422                                "[Penalty] Peer attempted graft within backoff time, penalizing"
1423                            );
1424                            // add behavioural penalty
1425                            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1426                                #[cfg(feature = "metrics")]
1427                                if let Some(metrics) = self.metrics.as_mut() {
1428                                    metrics.register_score_penalty(Penalty::GraftBackoff);
1429                                }
1430                                peer_score.add_penalty(peer_id, 1);
1431
1432                                // check the flood cutoff
1433                                // See: https://github.com/rust-lang/rust-clippy/issues/10061
1434                                #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1435                                let flood_cutoff = (backoff_time
1436                                    + self.config.graft_flood_threshold())
1437                                    - self.config.prune_backoff();
1438                                if flood_cutoff > now {
1439                                    // extra penalty
1440                                    peer_score.add_penalty(peer_id, 1);
1441                                }
1442                            }
1443                            // no PX
1444                            do_px = false;
1445
1446                            to_prune_topics.insert(topic_hash.clone());
1447                            continue;
1448                        }
1449                    }
1450
1451                    // check the score
1452                    if below_zero {
1453                        // we don't GRAFT peers with negative score
1454                        tracing::debug!(
1455                            peer=%peer_id,
1456                            %score,
1457                            topic=%topic_hash,
1458                            "GRAFT: ignoring peer with negative score"
1459                        );
1460                        // we do send them PRUNE however, because it's a matter of protocol
1461                        // correctness
1462                        to_prune_topics.insert(topic_hash.clone());
1463                        // but we won't PX to them
1464                        do_px = false;
1465                        continue;
1466                    }
1467
1468                    // check mesh upper bound and only allow graft if the upper bound is not reached
1469                    // or if it is an outbound peer
1470                    let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);
1471
1472                    if peers.len() >= mesh_n_high && !is_outbound {
1473                        to_prune_topics.insert(topic_hash.clone());
1474                        continue;
1475                    }
1476
1477                    // add peer to the mesh
1478                    tracing::debug!(
1479                        peer=%peer_id,
1480                        topic=%topic_hash,
1481                        "GRAFT: Mesh link added for peer in topic"
1482                    );
1483
1484                    if peers.insert(*peer_id) {
1485                        #[cfg(feature = "metrics")]
1486                        if let Some(m) = self.metrics.as_mut() {
1487                            m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1488                        }
1489                    }
1490
1491                    // If the peer did not previously exist in any mesh, inform the handler
1492                    peer_added_to_mesh(
1493                        *peer_id,
1494                        vec![&topic_hash],
1495                        &self.mesh,
1496                        &mut self.events,
1497                        &self.connected_peers,
1498                    );
1499
1500                    if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1501                        peer_score.graft(peer_id, topic_hash);
1502                    }
1503                } else {
1504                    // don't do PX when there is an unknown topic to avoid leaking our peers
1505                    do_px = false;
1506                    tracing::debug!(
1507                        peer=%peer_id,
1508                        topic=%topic_hash,
1509                        "GRAFT: Received graft for unknown topic from peer"
1510                    );
1511                    // spam hardening: ignore GRAFTs for unknown topics
1512                    continue;
1513                }
1514            }
1515        }
1516
1517        if !to_prune_topics.is_empty() {
1518            // build the prune messages to send
1519            let on_unsubscribe = false;
1520
1521            for prune in to_prune_topics
1522                .iter()
1523                .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1524                .collect::<Vec<_>>()
1525            {
1526                self.send_message(*peer_id, RpcOut::Prune(prune));
1527            }
1528            // Send the prune messages to the peer
1529            tracing::debug!(
1530                peer=%peer_id,
1531                "GRAFT: Not subscribed to topics -  Sending PRUNE to peer"
1532            );
1533        }
1534        tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1535    }
1536
1537    /// Removes the specified peer from the mesh, returning true if it was present.
1538    fn remove_peer_from_mesh(
1539        &mut self,
1540        peer_id: &PeerId,
1541        topic_hash: &TopicHash,
1542        backoff: Option<u64>,
1543        always_update_backoff: bool,
1544    ) -> bool {
1545        let mut peer_removed = false;
1546        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1547            // remove the peer if it exists in the mesh
1548            peer_removed = peers.remove(peer_id);
1549            if peer_removed {
1550                tracing::debug!(
1551                    peer=%peer_id,
1552                    topic=%topic_hash,
1553                    "PRUNE: Removing peer from the mesh for topic"
1554                );
1555
1556                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1557                    peer_score.prune(peer_id, topic_hash.clone());
1558                }
1559
1560                // inform the handler
1561                peer_removed_from_mesh(
1562                    *peer_id,
1563                    topic_hash,
1564                    &self.mesh,
1565                    &mut self.events,
1566                    &self.connected_peers,
1567                );
1568            }
1569        }
1570        if always_update_backoff || peer_removed {
1571            let time = if let Some(backoff) = backoff {
1572                Duration::from_secs(backoff)
1573            } else {
1574                self.config.prune_backoff()
1575            };
1576            // is there a backoff specified by the peer? if so obey it.
1577            self.backoffs.update_backoff(topic_hash, peer_id, time);
1578        }
1579        peer_removed
1580    }
1581
1582    /// Handles PRUNE control messages. Removes peer from the mesh.
1583    fn handle_prune(
1584        &mut self,
1585        peer_id: &PeerId,
1586        prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1587    ) {
1588        tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1589        let (below_threshold, score) = self
1590            .peer_score
1591            .below_threshold(peer_id, |ts| ts.accept_px_threshold);
1592        for (topic_hash, px, backoff) in prune_data {
1593            if self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true) {
1594                #[cfg(feature = "metrics")]
1595                if let Some(m) = self.metrics.as_mut() {
1596                    m.peers_removed(&topic_hash, Churn::Prune, 1);
1597                }
1598            }
1599
1600            if self.mesh.contains_key(&topic_hash) {
1601                // connect to px peers
1602                if !px.is_empty() {
1603                    // we ignore PX from peers with insufficient score
1604                    if below_threshold {
1605                        tracing::debug!(
1606                            peer=%peer_id,
1607                            %score,
1608                            topic=%topic_hash,
1609                            "PRUNE: ignoring PX from peer with insufficient score"
1610                        );
1611                        continue;
1612                    }
1613
1614                    // NOTE: We cannot dial any peers from PX currently as we typically will not
1615                    // know their multiaddr. Until SignedRecords are spec'd this
1616                    // remains a stub. By default `config.prune_peers()` is set to zero and
1617                    // this is skipped. If the user modifies this, this will only be able to
1618                    // dial already known peers (from an external discovery mechanism for
1619                    // example).
1620                    if self.config.prune_peers() > 0 {
1621                        self.px_connect(px);
1622                    }
1623                }
1624            }
1625        }
1626        tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1627    }
1628
1629    fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1630        let n = self.config.prune_peers();
1631        // Ignore peerInfo with no ID
1632        //
1633        // TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
1634        // signed peer record?
1635        px.retain(|p| p.peer_id.is_some());
1636        if px.len() > n {
1637            // only use at most prune_peers many random peers
1638            let mut rng = thread_rng();
1639            px.partial_shuffle(&mut rng, n);
1640            px = px.into_iter().take(n).collect();
1641        }
1642
1643        for p in px {
1644            // TODO: Once signed records are spec'd: extract signed peer record if given and handle
1645            // it, see https://github.com/libp2p/specs/pull/217
1646            if let Some(peer_id) = p.peer_id {
1647                // mark as px peer
1648                self.px_peers.insert(peer_id);
1649
1650                // dial peer
1651                self.events.push_back(ToSwarm::Dial {
1652                    opts: DialOpts::peer_id(peer_id).build(),
1653                });
1654            }
1655        }
1656    }
1657
1658    /// Applies some basic checks to whether this message is valid. Does not apply user validation
1659    /// checks.
1660    fn message_is_valid(
1661        &mut self,
1662        msg_id: &MessageId,
1663        raw_message: &mut RawMessage,
1664        propagation_source: &PeerId,
1665    ) -> bool {
1666        tracing::debug!(
1667            peer=%propagation_source,
1668            message_id=%msg_id,
1669            "Handling message from peer"
1670        );
1671
1672        // Reject any message from a blacklisted peer
1673        if self.blacklisted_peers.contains(propagation_source) {
1674            tracing::debug!(
1675                peer=%propagation_source,
1676                "Rejecting message from blacklisted peer"
1677            );
1678            self.gossip_promises
1679                .reject_message(msg_id, &RejectReason::BlackListedPeer);
1680            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1681                peer_score.reject_message(
1682                    propagation_source,
1683                    msg_id,
1684                    &raw_message.topic,
1685                    RejectReason::BlackListedPeer,
1686                );
1687            }
1688            return false;
1689        }
1690
1691        // Also reject any message that originated from a blacklisted peer
1692        if let Some(source) = raw_message.source.as_ref() {
1693            if self.blacklisted_peers.contains(source) {
1694                tracing::debug!(
1695                    peer=%propagation_source,
1696                    %source,
1697                    "Rejecting message from peer because of blacklisted source"
1698                );
1699                self.handle_invalid_message(
1700                    propagation_source,
1701                    &raw_message.topic,
1702                    Some(msg_id),
1703                    RejectReason::BlackListedSource,
1704                );
1705                return false;
1706            }
1707        }
1708
1709        // If we are not validating messages, assume this message is validated
1710        // This will allow the message to be gossiped without explicitly calling
1711        // `validate_message`.
1712        if !self.config.validate_messages() {
1713            raw_message.validated = true;
1714        }
1715
1716        // reject messages claiming to be from ourselves but not locally published
1717        let self_published = !self.config.allow_self_origin()
1718            && if let Some(own_id) = self.publish_config.get_own_id() {
1719                own_id != propagation_source
1720                    && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1721            } else {
1722                self.published_message_ids.contains(msg_id)
1723            };
1724
1725        if self_published {
1726            tracing::debug!(
1727                message_id=%msg_id,
1728                source=%propagation_source,
1729                "Dropping message claiming to be from self but forwarded from source"
1730            );
1731            self.handle_invalid_message(
1732                propagation_source,
1733                &raw_message.topic,
1734                Some(msg_id),
1735                RejectReason::SelfOrigin,
1736            );
1737            return false;
1738        }
1739
1740        true
1741    }
1742
1743    /// Handles a newly received [`RawMessage`].
1744    ///
1745    /// Forwards the message to all peers in the mesh.
1746    fn handle_received_message(
1747        &mut self,
1748        mut raw_message: RawMessage,
1749        propagation_source: &PeerId,
1750    ) {
1751        // Record the received metric
1752        #[cfg(feature = "metrics")]
1753        if let Some(metrics) = self.metrics.as_mut() {
1754            metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1755        }
1756
1757        // Try and perform the data transform to the message. If it fails, consider it invalid.
1758        let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1759            Ok(message) => message,
1760            Err(e) => {
1761                tracing::debug!("Invalid message. Transform error: {:?}", e);
1762                // Reject the message and return
1763                self.handle_invalid_message(
1764                    propagation_source,
1765                    &raw_message.topic,
1766                    None,
1767                    RejectReason::ValidationError(ValidationError::TransformFailed),
1768                );
1769                return;
1770            }
1771        };
1772
1773        // Calculate the message id on the transformed data.
1774        let msg_id = self.config.message_id(&message);
1775
1776        // Broadcast IDONTWANT messages
1777        if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() {
1778            let recipient_peers = self
1779                .mesh
1780                .get(&message.topic)
1781                .map(|mesh| mesh.iter())
1782                .unwrap_or_default()
1783                .copied()
1784                .chain(self.gossip_promises.peers_for_message(&msg_id))
1785                .filter(|peer_id| {
1786                    peer_id != propagation_source && Some(peer_id) != message.source.as_ref()
1787                })
1788                .collect::<Vec<PeerId>>();
1789
1790            for peer_id in recipient_peers {
1791                self.send_message(
1792                    peer_id,
1793                    RpcOut::IDontWant(IDontWant {
1794                        message_ids: vec![msg_id.clone()],
1795                    }),
1796                );
1797            }
1798        }
1799
1800        // Check the validity of the message
1801        // Peers get penalized if this message is invalid. We don't add it to the duplicate cache
1802        // and instead continually penalize peers that repeatedly send this message.
1803        if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1804            return;
1805        }
1806
1807        if !self.duplicate_cache.insert(msg_id.clone()) {
1808            tracing::debug!(message_id=%msg_id, "Message already received, ignoring");
1809            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1810                peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1811            }
1812            self.mcache.observe_duplicate(&msg_id, propagation_source);
1813            return;
1814        }
1815
1816        tracing::debug!(
1817            message_id=%msg_id,
1818            "Put message in duplicate_cache and resolve promises"
1819        );
1820
1821        // Record the received message with the metrics
1822        #[cfg(feature = "metrics")]
1823        if let Some(metrics) = self.metrics.as_mut() {
1824            metrics.msg_recvd(&message.topic);
1825        }
1826
1827        // Tells score that message arrived (but is maybe not fully validated yet).
1828        // Consider the message as delivered for gossip promises.
1829        self.gossip_promises.message_delivered(&msg_id);
1830
1831        // Tells score that message arrived (but is maybe not fully validated yet).
1832        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1833            peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1834        }
1835
1836        // Add the message to our memcache
1837        self.mcache.put(&msg_id, raw_message.clone());
1838
1839        // Dispatch the message to the user if we are subscribed to any of the topics
1840        #[allow(
1841            clippy::map_entry,
1842            reason = "False positive, see rust-lang/rust-clippy#14449."
1843        )]
1844        if self.mesh.contains_key(&message.topic) {
1845            tracing::debug!("Sending received message to user");
1846            self.events
1847                .push_back(ToSwarm::GenerateEvent(Event::Message {
1848                    propagation_source: *propagation_source,
1849                    message_id: msg_id.clone(),
1850                    message,
1851                }));
1852        } else {
1853            tracing::debug!(
1854                topic=%message.topic,
1855                "Received message on a topic we are not subscribed to"
1856            );
1857            return;
1858        }
1859
1860        // forward the message to mesh peers, if no validation is required
1861        if !self.config.validate_messages() {
1862            self.forward_msg(
1863                &msg_id,
1864                raw_message,
1865                Some(propagation_source),
1866                HashSet::new(),
1867            );
1868            tracing::debug!(message_id=%msg_id, "Completed message handling for message");
1869        }
1870    }
1871
1872    // Handles invalid messages received.
1873    fn handle_invalid_message(
1874        &mut self,
1875        propagation_source: &PeerId,
1876        topic_hash: &TopicHash,
1877        message_id: Option<&MessageId>,
1878        reject_reason: RejectReason,
1879    ) {
1880        #[cfg(feature = "metrics")]
1881        if let Some(metrics) = self.metrics.as_mut() {
1882            metrics.register_invalid_message(topic_hash);
1883        }
1884        if let Some(msg_id) = message_id {
1885            // Valid transformation without peer scoring
1886            self.gossip_promises.reject_message(msg_id, &reject_reason);
1887        }
1888        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1889            // The compiler will optimize this pattern-matching
1890            if let Some(msg_id) = message_id {
1891                // The message itself is valid, but is from a banned peer or
1892                // claiming to be self-origin but is actually forwarded from other peers.
1893                peer_score.reject_message(propagation_source, msg_id, topic_hash, reject_reason);
1894            } else {
1895                // The message is invalid, we reject it ignoring any gossip promises. If a peer is
1896                // advertising this message via an IHAVE and it's invalid it will be double
1897                // penalized, one for sending us an invalid and again for breaking a promise.
1898                peer_score.reject_invalid_message(propagation_source, topic_hash);
1899            }
1900        }
1901    }
1902
1903    /// Handles received subscriptions.
1904    fn handle_received_subscriptions(
1905        &mut self,
1906        subscriptions: &[Subscription],
1907        propagation_source: &PeerId,
1908    ) {
1909        tracing::trace!(
1910            source=%propagation_source,
1911            "Handling subscriptions: {:?}",
1912            subscriptions,
1913        );
1914
1915        let mut unsubscribed_peers = Vec::new();
1916
1917        let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1918            tracing::error!(
1919                peer=%propagation_source,
1920                "Subscription by unknown peer"
1921            );
1922            return;
1923        };
1924
1925        // Collect potential graft topics for the peer.
1926        let mut topics_to_graft = Vec::new();
1927
1928        // Notify the application about the subscription, after the grafts are sent.
1929        let mut application_event = Vec::new();
1930
1931        let filtered_topics = match self
1932            .subscription_filter
1933            .filter_incoming_subscriptions(subscriptions, &peer.topics)
1934        {
1935            Ok(topics) => topics,
1936            Err(s) => {
1937                tracing::error!(
1938                    peer=%propagation_source,
1939                    "Subscription filter error: {}; ignoring RPC from peer",
1940                    s
1941                );
1942                return;
1943            }
1944        };
1945
1946        for subscription in filtered_topics {
1947            // get the peers from the mapping, or insert empty lists if the topic doesn't exist
1948            let topic_hash = &subscription.topic_hash;
1949
1950            match subscription.action {
1951                SubscriptionAction::Subscribe => {
1952                    if peer.topics.insert(topic_hash.clone()) {
1953                        tracing::debug!(
1954                            peer=%propagation_source,
1955                            topic=%topic_hash,
1956                            "SUBSCRIPTION: Adding gossip peer to topic"
1957                        );
1958
1959                        #[cfg(feature = "metrics")]
1960                        if let Some(m) = self.metrics.as_mut() {
1961                            m.inc_topic_peers(topic_hash);
1962                        }
1963                    }
1964
1965                    // if the mesh needs peers add the peer to the mesh
1966                    if !self.explicit_peers.contains(propagation_source)
1967                        && peer.kind.is_gossipsub()
1968                        && !self
1969                            .peer_score
1970                            .below_threshold(propagation_source, |_| 0.0)
1971                            .0
1972                        && !self
1973                            .backoffs
1974                            .is_backoff_with_slack(topic_hash, propagation_source)
1975                    {
1976                        if let Some(peers) = self.mesh.get_mut(topic_hash) {
1977                            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
1978
1979                            if peers.len() < mesh_n_low && peers.insert(*propagation_source) {
1980                                tracing::debug!(
1981                                    peer=%propagation_source,
1982                                    topic=%topic_hash,
1983                                    "SUBSCRIPTION: Adding peer to the mesh for topic"
1984                                );
1985                                #[cfg(feature = "metrics")]
1986                                if let Some(m) = self.metrics.as_mut() {
1987                                    m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1988                                }
1989                                // send graft to the peer
1990                                tracing::debug!(
1991                                    peer=%propagation_source,
1992                                    topic=%topic_hash,
1993                                    "Sending GRAFT to peer for topic"
1994                                );
1995                                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
1996                                    peer_score.graft(propagation_source, topic_hash.clone());
1997                                }
1998                                topics_to_graft.push(topic_hash.clone());
1999                            }
2000                        }
2001                    }
2002                    // generates a subscription event to be polled
2003                    application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
2004                        peer_id: *propagation_source,
2005                        topic: topic_hash.clone(),
2006                    }));
2007                }
2008                SubscriptionAction::Unsubscribe => {
2009                    if peer.topics.remove(topic_hash) {
2010                        tracing::debug!(
2011                            peer=%propagation_source,
2012                            topic=%topic_hash,
2013                            "SUBSCRIPTION: Removing gossip peer from topic"
2014                        );
2015
2016                        #[cfg(feature = "metrics")]
2017                        if let Some(m) = self.metrics.as_mut() {
2018                            m.dec_topic_peers(topic_hash);
2019                        }
2020                    }
2021
2022                    unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
2023                    // generate an unsubscribe event to be polled
2024                    application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
2025                        peer_id: *propagation_source,
2026                        topic: topic_hash.clone(),
2027                    }));
2028                }
2029            }
2030        }
2031
2032        // remove unsubscribed peers from the mesh and fanout if they exist there.
2033        for (peer_id, topic_hash) in unsubscribed_peers {
2034            self.fanout
2035                .get_mut(&topic_hash)
2036                .map(|peers| peers.remove(&peer_id));
2037            if self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false) {
2038                #[cfg(feature = "metrics")]
2039                if let Some(m) = self.metrics.as_mut() {
2040                    m.peers_removed(&topic_hash, Churn::Unsub, 1);
2041                }
2042            };
2043        }
2044
2045        // Potentially inform the handler if we have added this peer to a mesh for the first time.
2046        let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
2047        if !topics_joined.is_empty() {
2048            peer_added_to_mesh(
2049                *propagation_source,
2050                topics_joined,
2051                &self.mesh,
2052                &mut self.events,
2053                &self.connected_peers,
2054            );
2055        }
2056
2057        // If we need to send grafts to peer, do so immediately, rather than waiting for the
2058        // heartbeat.
2059        for topic_hash in topics_to_graft.into_iter() {
2060            self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
2061        }
2062
2063        // Notify the application of the subscriptions
2064        for event in application_event {
2065            self.events.push_back(event);
2066        }
2067
2068        tracing::trace!(
2069            source=%propagation_source,
2070            "Completed handling subscriptions from source"
2071        );
2072    }
2073
2074    /// Applies penalties to peers that did not respond to our IWANT requests.
2075    fn apply_iwant_penalties(&mut self) {
2076        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2077            for (peer, count) in self.gossip_promises.get_broken_promises() {
2078                peer_score.add_penalty(&peer, count);
2079                #[cfg(feature = "metrics")]
2080                if let Some(metrics) = self.metrics.as_mut() {
2081                    metrics.register_score_penalty(Penalty::BrokenPromise);
2082                }
2083            }
2084        }
2085    }
2086
2087    /// Heartbeat function which shifts the memcache and updates the mesh.
2088    fn heartbeat(&mut self) {
2089        #[cfg(feature = "metrics")]
2090        let start = Instant::now();
2091
2092        // Every heartbeat we sample the send queues to add to our metrics. We do this intentionally
2093        // before we add all the gossip from this heartbeat in order to gain a true measure of
2094        // steady-state size of the queues.
2095        #[cfg(feature = "metrics")]
2096        if let Some(m) = &mut self.metrics {
2097            for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2098                m.observe_priority_queue_size(sender_queue.priority_queue_len());
2099                m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2100            }
2101        }
2102
2103        self.heartbeat_ticks += 1;
2104
2105        let mut to_graft = HashMap::new();
2106        let mut to_prune = HashMap::new();
2107        let mut no_px = HashSet::new();
2108
2109        // clean up expired backoffs
2110        self.backoffs.heartbeat();
2111
2112        // clean up ihave counters
2113        self.count_sent_iwant.clear();
2114        self.count_received_ihave.clear();
2115
2116        // apply iwant penalties
2117        self.apply_iwant_penalties();
2118
2119        // check connections to explicit peers
2120        if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2121            for p in self.explicit_peers.clone() {
2122                self.check_explicit_peer_connection(&p);
2123            }
2124        }
2125
2126        // Cache the scores of all connected peers, and record metrics for current penalties.
2127        let mut scores = HashMap::with_capacity(self.connected_peers.len());
2128        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2129            for peer_id in self.connected_peers.keys() {
2130                #[allow(unused_variables)]
2131                let report = scores
2132                    .entry(peer_id)
2133                    .or_insert_with(|| peer_score.score_report(peer_id));
2134
2135                #[cfg(feature = "metrics")]
2136                if let Some(metrics) = self.metrics.as_mut() {
2137                    for penalty in &report.penalties {
2138                        metrics.register_score_penalty(*penalty);
2139                    }
2140                }
2141            }
2142        }
2143
2144        // maintain the mesh for each topic
2145        for (topic_hash, peers) in self.mesh.iter_mut() {
2146            let explicit_peers = &self.explicit_peers;
2147            let backoffs = &self.backoffs;
2148
2149            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2150            let mesh_n_low = self.config.mesh_n_low_for_topic(topic_hash);
2151            let mesh_n_high = self.config.mesh_n_high_for_topic(topic_hash);
2152            let mesh_outbound_min = self.config.mesh_outbound_min_for_topic(topic_hash);
2153
2154            // drop all peers with negative score, without PX
2155            // if there is at some point a stable retain method for BTreeSet the following can be
2156            // written more efficiently with retain.
2157            let mut to_remove_peers = Vec::new();
2158            for peer_id in peers.iter() {
2159                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2160
2161                // Record the score per mesh
2162                #[cfg(feature = "metrics")]
2163                if let Some(metrics) = self.metrics.as_mut() {
2164                    metrics.observe_mesh_peers_score(topic_hash, peer_score);
2165                }
2166
2167                if peer_score < 0.0 {
2168                    tracing::debug!(
2169                        peer=%peer_id,
2170                        score=%peer_score,
2171                        topic=%topic_hash,
2172                        "HEARTBEAT: Prune peer with negative score"
2173                    );
2174
2175                    let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2176                    current_topic.push(topic_hash.clone());
2177                    no_px.insert(*peer_id);
2178                    to_remove_peers.push(*peer_id);
2179                }
2180            }
2181
2182            #[cfg(feature = "metrics")]
2183            if let Some(m) = self.metrics.as_mut() {
2184                m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2185            }
2186
2187            for peer_id in to_remove_peers {
2188                peers.remove(&peer_id);
2189            }
2190
2191            // too little peers - add some
2192            if peers.len() < mesh_n_low {
2193                tracing::debug!(
2194                    topic=%topic_hash,
2195                    "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2196                    peers.len(),
2197                    self.config.mesh_n()
2198                );
2199                // not enough peers - get mesh_n - current_length more
2200                let desired_peers = mesh_n - peers.len();
2201                let peer_list =
2202                    get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2203                        !peers.contains(peer)
2204                            && !explicit_peers.contains(peer)
2205                            && !backoffs.is_backoff_with_slack(topic_hash, peer)
2206                            && scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2207                    });
2208                for peer in &peer_list {
2209                    let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2210                    current_topic.push(topic_hash.clone());
2211                }
2212                // update the mesh
2213                tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2214                #[cfg(feature = "metrics")]
2215                if let Some(m) = self.metrics.as_mut() {
2216                    m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2217                }
2218                peers.extend(peer_list);
2219            }
2220
2221            // too many peers - remove some
2222            if peers.len() > mesh_n_high {
2223                tracing::debug!(
2224                    topic=%topic_hash,
2225                    "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
2226                    peers.len(),
2227                    self.config.mesh_n()
2228                );
2229                let excess_peer_no = peers.len() - mesh_n;
2230
2231                // shuffle the peers and then sort by score ascending beginning with the worst
2232                let mut rng = thread_rng();
2233                let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2234                shuffled.shuffle(&mut rng);
2235                shuffled.sort_by(|p1, p2| {
2236                    let score_p1 = scores.get(p1).map(|r| r.score).unwrap_or_default();
2237                    let score_p2 = scores.get(p2).map(|r| r.score).unwrap_or_default();
2238
2239                    score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2240                });
2241                // shuffle everything except the last retain_scores many peers (the best ones)
2242                shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2243
2244                // count total number of outbound peers
2245                let mut outbound = shuffled
2246                    .iter()
2247                    .filter(|peer_id| {
2248                        self.connected_peers
2249                            .get(peer_id)
2250                            .is_some_and(|peer| peer.outbound)
2251                    })
2252                    .count();
2253
2254                // remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2255                // them to to_prune
2256                let mut removed = 0;
2257                for peer in shuffled {
2258                    if removed == excess_peer_no {
2259                        break;
2260                    }
2261                    if self
2262                        .connected_peers
2263                        .get(&peer)
2264                        .is_some_and(|peer| peer.outbound)
2265                    {
2266                        if outbound <= mesh_outbound_min {
2267                            // do not remove anymore outbound peers
2268                            continue;
2269                        }
2270                        // an outbound peer gets removed
2271                        outbound -= 1;
2272                    }
2273
2274                    // remove the peer
2275                    peers.remove(&peer);
2276                    let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2277                    current_topic.push(topic_hash.clone());
2278                    removed += 1;
2279                }
2280
2281                #[cfg(feature = "metrics")]
2282                if let Some(m) = self.metrics.as_mut() {
2283                    m.peers_removed(topic_hash, Churn::Excess, removed)
2284                }
2285            }
2286
2287            // do we have enough outbound peers?
2288            if peers.len() >= mesh_n_low {
2289                // count number of outbound peers we have
2290                let outbound = peers
2291                    .iter()
2292                    .filter(|peer_id| {
2293                        self.connected_peers
2294                            .get(peer_id)
2295                            .is_some_and(|peer| peer.outbound)
2296                    })
2297                    .count();
2298
2299                // if we have not enough outbound peers, graft to some new outbound peers
2300                if outbound < mesh_outbound_min {
2301                    let needed = mesh_outbound_min - outbound;
2302                    let peer_list =
2303                        get_random_peers(&self.connected_peers, topic_hash, needed, |peer_id| {
2304                            !peers.contains(peer_id)
2305                                && !explicit_peers.contains(peer_id)
2306                                && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2307                                && scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2308                                && self
2309                                    .connected_peers
2310                                    .get(peer_id)
2311                                    .is_some_and(|peer| peer.outbound)
2312                        });
2313
2314                    for peer in &peer_list {
2315                        let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2316                        current_topic.push(topic_hash.clone());
2317                    }
2318                    // update the mesh
2319                    tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2320                    #[cfg(feature = "metrics")]
2321                    if let Some(m) = self.metrics.as_mut() {
2322                        m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2323                    }
2324                    peers.extend(peer_list);
2325                }
2326            }
2327
2328            // should we try to improve the mesh with opportunistic grafting?
2329            if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2330                && peers.len() > 1
2331            {
2332                if let PeerScoreState::Active(peer_score) = &self.peer_score {
2333                    // Opportunistic grafting works as follows: we check the median score of peers
2334                    // in the mesh; if this score is below the opportunisticGraftThreshold, we
2335                    // select a few peers at random with score over the median.
2336                    // The intention is to (slowly) improve an underperforming mesh by introducing
2337                    // good scoring peers that may have been gossiping at us. This allows us to
2338                    // get out of sticky situations where we are stuck with poor peers and also
2339                    // recover from churn of good peers.
2340
2341                    // now compute the median peer score in the mesh
2342                    let mut peers_by_score: Vec<_> = peers.iter().collect();
2343                    peers_by_score.sort_by(|p1, p2| {
2344                        let p1_score = scores.get(p1).map(|r| r.score).unwrap_or_default();
2345                        let p2_score = scores.get(p2).map(|r| r.score).unwrap_or_default();
2346                        p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2347                    });
2348
2349                    let middle = peers_by_score.len() / 2;
2350                    let median = if peers_by_score.len() % 2 == 0 {
2351                        let sub_middle_peer = *peers_by_score
2352                            .get(middle - 1)
2353                            .expect("middle < vector length and middle > 0 since peers.len() > 0");
2354                        let sub_middle_score = scores
2355                            .get(sub_middle_peer)
2356                            .map(|r| r.score)
2357                            .unwrap_or_default();
2358                        let middle_peer =
2359                            *peers_by_score.get(middle).expect("middle < vector length");
2360                        let middle_score =
2361                            scores.get(middle_peer).map(|r| r.score).unwrap_or_default();
2362
2363                        (sub_middle_score + middle_score) * 0.5
2364                    } else {
2365                        scores
2366                            .get(*peers_by_score.get(middle).expect("middle < vector length"))
2367                            .map(|r| r.score)
2368                            .unwrap_or_default()
2369                    };
2370
2371                    // if the median score is below the threshold, select a better peer (if any) and
2372                    // GRAFT
2373                    if median < peer_score.thresholds.opportunistic_graft_threshold {
2374                        let peer_list = get_random_peers(
2375                            &self.connected_peers,
2376                            topic_hash,
2377                            self.config.opportunistic_graft_peers(),
2378                            |peer_id| {
2379                                !peers.contains(peer_id)
2380                                    && !explicit_peers.contains(peer_id)
2381                                    && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2382                                    && scores.get(peer_id).map(|r| r.score).unwrap_or_default()
2383                                        > median
2384                            },
2385                        );
2386                        for peer in &peer_list {
2387                            let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2388                            current_topic.push(topic_hash.clone());
2389                        }
2390                        // update the mesh
2391                        tracing::debug!(
2392                            topic=%topic_hash,
2393                            "Opportunistically graft in topic with peers {:?}",
2394                            peer_list
2395                        );
2396                        #[cfg(feature = "metrics")]
2397                        if let Some(m) = self.metrics.as_mut() {
2398                            m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2399                        }
2400                        peers.extend(peer_list);
2401                    }
2402                }
2403            }
2404            // Register the final count of peers in the mesh
2405            #[cfg(feature = "metrics")]
2406            if let Some(m) = self.metrics.as_mut() {
2407                m.set_mesh_peers(topic_hash, peers.len())
2408            }
2409        }
2410
2411        // remove expired fanout topics
2412        {
2413            let fanout = &mut self.fanout; // help the borrow checker
2414            let fanout_ttl = self.config.fanout_ttl();
2415            self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2416                if *last_pub_time + fanout_ttl < Instant::now() {
2417                    tracing::debug!(
2418                        topic=%topic_hash,
2419                        "HEARTBEAT: Fanout topic removed due to timeout"
2420                    );
2421                    fanout.remove(topic_hash);
2422                    return false;
2423                }
2424                true
2425            });
2426        }
2427
2428        // maintain fanout
2429        // check if our peers are still a part of the topic
2430        for (topic_hash, peers) in self.fanout.iter_mut() {
2431            let mut to_remove_peers = Vec::new();
2432            let publish_threshold = match &self.peer_score {
2433                PeerScoreState::Active(peer_score) => peer_score.thresholds.publish_threshold,
2434                _ => 0.0,
2435            };
2436            let mesh_n = self.config.mesh_n_for_topic(topic_hash);
2437
2438            for peer_id in peers.iter() {
2439                // is the peer still subscribed to the topic?
2440                let peer_score = scores.get(peer_id).map(|r| r.score).unwrap_or_default();
2441                match self.connected_peers.get(peer_id) {
2442                    Some(peer) => {
2443                        if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2444                            tracing::debug!(
2445                                topic=%topic_hash,
2446                                "HEARTBEAT: Peer removed from fanout for topic"
2447                            );
2448                            to_remove_peers.push(*peer_id);
2449                        }
2450                    }
2451                    None => {
2452                        // remove if the peer has disconnected
2453                        to_remove_peers.push(*peer_id);
2454                    }
2455                }
2456            }
2457            for to_remove in to_remove_peers {
2458                peers.remove(&to_remove);
2459            }
2460
2461            // not enough peers
2462            if peers.len() < mesh_n {
2463                tracing::debug!(
2464                    "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2465                    peers.len(),
2466                    mesh_n
2467                );
2468                let needed_peers = mesh_n - peers.len();
2469                let explicit_peers = &self.explicit_peers;
2470                let new_peers =
2471                    get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2472                        !peers.contains(peer_id)
2473                            && !explicit_peers.contains(peer_id)
2474                            && !self
2475                                .peer_score
2476                                .below_threshold(peer_id, |ts| ts.publish_threshold)
2477                                .0
2478                    });
2479                peers.extend(new_peers);
2480            }
2481        }
2482
2483        if let PeerScoreState::Active(peer_score) = &self.peer_score {
2484            tracing::trace!("Mesh message deliveries: {:?}", {
2485                self.mesh
2486                    .iter()
2487                    .map(|(t, peers)| {
2488                        (
2489                            t.clone(),
2490                            peers
2491                                .iter()
2492                                .map(|p| {
2493                                    (*p, peer_score.mesh_message_deliveries(p, t).unwrap_or(0.0))
2494                                })
2495                                .collect::<HashMap<PeerId, f64>>(),
2496                        )
2497                    })
2498                    .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2499            })
2500        }
2501
2502        self.emit_gossip();
2503
2504        // send graft/prunes
2505        if !to_graft.is_empty() | !to_prune.is_empty() {
2506            self.send_graft_prune(to_graft, to_prune, no_px);
2507        }
2508
2509        // shift the memcache
2510        self.mcache.shift();
2511
2512        // Report expired messages
2513        for (peer_id, failed_messages) in self.failed_messages.drain() {
2514            tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2515            self.events
2516                .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2517                    peer_id,
2518                    failed_messages,
2519                }));
2520        }
2521        self.failed_messages.shrink_to_fit();
2522
2523        // Flush stale IDONTWANTs.
2524        for peer in self.connected_peers.values_mut() {
2525            while let Some((_front, instant)) = peer.dont_send.front() {
2526                if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
2527                    break;
2528                } else {
2529                    peer.dont_send.pop_front();
2530                }
2531            }
2532        }
2533
2534        #[cfg(feature = "metrics")]
2535        if let Some(metrics) = self.metrics.as_mut() {
2536            let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2537            metrics.observe_heartbeat_duration(duration);
2538        }
2539    }
2540
2541    /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
2542    /// and fanout peers
2543    fn emit_gossip(&mut self) {
2544        let mut rng = thread_rng();
2545        let mut messages = Vec::new();
2546        for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2547            let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2548            if message_ids.is_empty() {
2549                continue;
2550            }
2551
2552            // if we are emitting more than GossipSubMaxIHaveLength message_ids, truncate the list
2553            if message_ids.len() > self.config.max_ihave_length() {
2554                // we do the truncation (with shuffling) per peer below
2555                tracing::debug!(
2556                    "too many messages for gossip; will truncate IHAVE list ({} messages)",
2557                    message_ids.len()
2558                );
2559            } else {
2560                // shuffle to emit in random order
2561                message_ids.shuffle(&mut rng);
2562            }
2563
2564            // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
2565            let n_map = |m| {
2566                max(
2567                    self.config.gossip_lazy(),
2568                    (self.config.gossip_factor() * m as f64) as usize,
2569                )
2570            };
2571            // get gossip_lazy random peers
2572            let to_msg_peers =
2573                get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2574                    !peers.contains(peer)
2575                        && !self.explicit_peers.contains(peer)
2576                        && !self
2577                            .peer_score
2578                            .below_threshold(peer, |ts| ts.gossip_threshold)
2579                            .0
2580                });
2581
2582            tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2583
2584            for peer_id in to_msg_peers {
2585                let mut peer_message_ids = message_ids.clone();
2586
2587                if peer_message_ids.len() > self.config.max_ihave_length() {
2588                    // We do this per peer so that we emit a different set for each peer.
2589                    // we have enough redundancy in the system that this will significantly increase
2590                    // the message coverage when we do truncate.
2591                    peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2592                    peer_message_ids.truncate(self.config.max_ihave_length());
2593                }
2594
2595                // send an IHAVE message
2596                messages.push((
2597                    peer_id,
2598                    RpcOut::IHave(IHave {
2599                        topic_hash: topic_hash.clone(),
2600                        message_ids: peer_message_ids,
2601                    }),
2602                ));
2603            }
2604        }
2605        for (peer_id, message) in messages {
2606            self.send_message(peer_id, message);
2607        }
2608    }
2609
2610    /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control
2611    /// messages.
2612    fn send_graft_prune(
2613        &mut self,
2614        to_graft: HashMap<PeerId, Vec<TopicHash>>,
2615        mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2616        no_px: HashSet<PeerId>,
2617    ) {
2618        // handle the grafts and overlapping prunes per peer
2619        for (peer_id, topics) in to_graft.into_iter() {
2620            for topic in &topics {
2621                // inform scoring of graft
2622                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2623                    peer_score.graft(&peer_id, topic.clone());
2624                }
2625
2626                // inform the handler of the peer being added to the mesh
2627                // If the peer did not previously exist in any mesh, inform the handler
2628                peer_added_to_mesh(
2629                    peer_id,
2630                    vec![topic],
2631                    &self.mesh,
2632                    &mut self.events,
2633                    &self.connected_peers,
2634                );
2635            }
2636            let rpc_msgs = topics.iter().map(|topic_hash| {
2637                RpcOut::Graft(Graft {
2638                    topic_hash: topic_hash.clone(),
2639                })
2640            });
2641
2642            // If there are prunes associated with the same peer add them.
2643            // NOTE: In this case a peer has been added to a topic mesh, and removed from another.
2644            // It therefore must be in at least one mesh and we do not need to inform the handler
2645            // of its removal from another.
2646
2647            // The following prunes are not due to unsubscribing.
2648            let prune_msgs = to_prune
2649                .remove(&peer_id)
2650                .into_iter()
2651                .flatten()
2652                .map(|topic_hash| {
2653                    let prune = self.make_prune(
2654                        &topic_hash,
2655                        &peer_id,
2656                        self.config.do_px() && !no_px.contains(&peer_id),
2657                        false,
2658                    );
2659                    RpcOut::Prune(prune)
2660                });
2661
2662            // send the rpc messages
2663            for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2664                self.send_message(peer_id, msg);
2665            }
2666        }
2667
2668        // handle the remaining prunes
2669        // The following prunes are not due to unsubscribing.
2670        for (peer_id, topics) in to_prune.iter() {
2671            for topic_hash in topics {
2672                let prune = self.make_prune(
2673                    topic_hash,
2674                    peer_id,
2675                    self.config.do_px() && !no_px.contains(peer_id),
2676                    false,
2677                );
2678                self.send_message(*peer_id, RpcOut::Prune(prune));
2679
2680                // inform the handler
2681                peer_removed_from_mesh(
2682                    *peer_id,
2683                    topic_hash,
2684                    &self.mesh,
2685                    &mut self.events,
2686                    &self.connected_peers,
2687                );
2688            }
2689        }
2690    }
2691
2692    /// Helper function which forwards a message to mesh\[topic\] peers.
2693    ///
2694    /// Returns true if at least one peer was messaged.
2695    fn forward_msg(
2696        &mut self,
2697        msg_id: &MessageId,
2698        message: RawMessage,
2699        propagation_source: Option<&PeerId>,
2700        originating_peers: HashSet<PeerId>,
2701    ) -> bool {
2702        // message is fully validated inform peer_score
2703        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2704            if let Some(peer) = propagation_source {
2705                peer_score.deliver_message(peer, msg_id, &message.topic);
2706            }
2707        }
2708
2709        tracing::debug!(message_id=%msg_id, "Forwarding message");
2710        let mut recipient_peers = HashSet::new();
2711
2712        // Populate the recipient peers mapping
2713
2714        // Add explicit peers and floodsub peers
2715        for (peer_id, peer) in &self.connected_peers {
2716            if Some(peer_id) != propagation_source
2717                && !originating_peers.contains(peer_id)
2718                && Some(peer_id) != message.source.as_ref()
2719                && peer.topics.contains(&message.topic)
2720                && (self.explicit_peers.contains(peer_id)
2721                    || (peer.kind == PeerKind::Floodsub
2722                        && !self
2723                            .peer_score
2724                            .below_threshold(peer_id, |ts| ts.publish_threshold)
2725                            .0))
2726            {
2727                recipient_peers.insert(*peer_id);
2728            }
2729        }
2730
2731        // add mesh peers
2732        let topic = &message.topic;
2733        // mesh
2734        if let Some(mesh_peers) = self.mesh.get(topic) {
2735            for peer_id in mesh_peers {
2736                if Some(peer_id) != propagation_source
2737                    && !originating_peers.contains(peer_id)
2738                    && Some(peer_id) != message.source.as_ref()
2739                {
2740                    recipient_peers.insert(*peer_id);
2741                }
2742            }
2743        }
2744
2745        if recipient_peers.is_empty() {
2746            return false;
2747        }
2748
2749        // forward the message to peers
2750        for peer_id in recipient_peers.iter() {
2751            if let Some(peer) = self.connected_peers.get_mut(peer_id) {
2752                if peer.dont_send.contains_key(msg_id) {
2753                    tracing::debug!(%peer_id, message_id=%msg_id, "Peer doesn't want message");
2754                    continue;
2755                }
2756
2757                tracing::debug!(%peer_id, message_id=%msg_id, "Sending message to peer");
2758
2759                self.send_message(
2760                    *peer_id,
2761                    RpcOut::Forward {
2762                        message: message.clone(),
2763                        timeout: Delay::new(self.config.forward_queue_duration()),
2764                    },
2765                );
2766            }
2767        }
2768        tracing::debug!("Completed forwarding message");
2769        true
2770    }
2771
2772    /// Constructs a [`RawMessage`] performing message signing if required.
2773    pub(crate) fn build_raw_message(
2774        &mut self,
2775        topic: TopicHash,
2776        data: Vec<u8>,
2777    ) -> Result<RawMessage, PublishError> {
2778        match &mut self.publish_config {
2779            PublishConfig::Signing {
2780                ref keypair,
2781                author,
2782                inline_key,
2783                last_seq_no,
2784            } => {
2785                let sequence_number = last_seq_no.next();
2786
2787                let signature = {
2788                    let message = proto::Message {
2789                        from: Some(author.to_bytes()),
2790                        data: Some(data.clone()),
2791                        seqno: Some(sequence_number.to_be_bytes().to_vec()),
2792                        topic: topic.clone().into_string(),
2793                        signature: None,
2794                        key: None,
2795                    };
2796
2797                    let mut buf = Vec::with_capacity(message.get_size());
2798                    let mut writer = Writer::new(&mut buf);
2799
2800                    message
2801                        .write_message(&mut writer)
2802                        .expect("Encoding to succeed");
2803
2804                    // the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
2805                    let mut signature_bytes = SIGNING_PREFIX.to_vec();
2806                    signature_bytes.extend_from_slice(&buf);
2807                    Some(keypair.sign(&signature_bytes)?)
2808                };
2809
2810                Ok(RawMessage {
2811                    source: Some(*author),
2812                    data,
2813                    // To be interoperable with the go-implementation this is treated as a 64-bit
2814                    // big-endian uint.
2815                    sequence_number: Some(sequence_number),
2816                    topic,
2817                    signature,
2818                    key: inline_key.clone(),
2819                    validated: true, // all published messages are valid
2820                })
2821            }
2822            PublishConfig::Author(peer_id) => {
2823                Ok(RawMessage {
2824                    source: Some(*peer_id),
2825                    data,
2826                    // To be interoperable with the go-implementation this is treated as a 64-bit
2827                    // big-endian uint.
2828                    sequence_number: Some(rand::random()),
2829                    topic,
2830                    signature: None,
2831                    key: None,
2832                    validated: true, // all published messages are valid
2833                })
2834            }
2835            PublishConfig::RandomAuthor => {
2836                Ok(RawMessage {
2837                    source: Some(PeerId::random()),
2838                    data,
2839                    // To be interoperable with the go-implementation this is treated as a 64-bit
2840                    // big-endian uint.
2841                    sequence_number: Some(rand::random()),
2842                    topic,
2843                    signature: None,
2844                    key: None,
2845                    validated: true, // all published messages are valid
2846                })
2847            }
2848            PublishConfig::Anonymous => {
2849                Ok(RawMessage {
2850                    source: None,
2851                    data,
2852                    // To be interoperable with the go-implementation this is treated as a 64-bit
2853                    // big-endian uint.
2854                    sequence_number: None,
2855                    topic,
2856                    signature: None,
2857                    key: None,
2858                    validated: true, // all published messages are valid
2859                })
2860            }
2861        }
2862    }
2863
2864    /// Send a [`RpcOut`] message to a peer.
2865    ///
2866    /// Returns `true` if sending was successful, `false` otherwise.
2867    /// The method will update the peer score and failed message counter if
2868    /// sending the message failed due to the channel to the connection handler being
2869    /// full (which indicates a slow peer).
2870    fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2871        #[cfg(feature = "metrics")]
2872        if let Some(m) = self.metrics.as_mut() {
2873            if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2874                // register bytes sent on the internal metrics.
2875                m.msg_sent(&message.topic, message.raw_protobuf_len());
2876            }
2877        }
2878
2879        let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2880            tracing::error!(peer = %peer_id,
2881                    "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2882            return false;
2883        };
2884
2885        if !matches!(peer.kind, PeerKind::Gossipsubv1_2) && matches!(rpc, RpcOut::IDontWant(..)) {
2886            tracing::trace!(peer=%peer_id, "Won't send IDONTWANT message for message to peer as it doesn't support Gossipsub v1.2");
2887            return false;
2888        }
2889
2890        // Try sending the message to the connection handler.
2891        match peer.sender.send_message(rpc) {
2892            Ok(()) => true,
2893            Err(rpc) => {
2894                // Sending failed because the channel is full.
2895                tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2896
2897                // Update failed message counter.
2898                let failed_messages = self.failed_messages.entry(peer_id).or_default();
2899                match rpc {
2900                    RpcOut::Publish { .. } => {
2901                        failed_messages.priority += 1;
2902                        failed_messages.publish += 1;
2903                    }
2904                    RpcOut::Forward { .. } => {
2905                        failed_messages.non_priority += 1;
2906                        failed_messages.forward += 1;
2907                    }
2908                    RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
2909                        failed_messages.non_priority += 1;
2910                    }
2911                    RpcOut::Graft(_)
2912                    | RpcOut::Prune(_)
2913                    | RpcOut::Subscribe(_)
2914                    | RpcOut::Unsubscribe(_) => {
2915                        unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2916                    }
2917                }
2918
2919                // Update peer score.
2920                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2921                    peer_score.failed_message_slow_peer(&peer_id);
2922                }
2923
2924                false
2925            }
2926        }
2927    }
2928
2929    fn on_connection_established(
2930        &mut self,
2931        ConnectionEstablished {
2932            peer_id,
2933            endpoint,
2934            other_established,
2935            ..
2936        }: ConnectionEstablished,
2937    ) {
2938        // Add the IP to the peer scoring system
2939        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2940            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2941                peer_score.add_ip(&peer_id, ip);
2942            } else {
2943                tracing::trace!(
2944                    peer=%peer_id,
2945                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2946                    endpoint
2947                )
2948            }
2949        }
2950
2951        if other_established > 0 {
2952            return; // Not our first connection to this peer, hence nothing to do.
2953        }
2954
2955        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2956            peer_score.add_peer(peer_id);
2957        }
2958
2959        // Ignore connections from blacklisted peers.
2960        if self.blacklisted_peers.contains(&peer_id) {
2961            tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2962            return;
2963        }
2964
2965        tracing::debug!(peer=%peer_id, "New peer connected");
2966        // We need to send our subscriptions to the newly-connected node.
2967        for topic_hash in self.mesh.clone().into_keys() {
2968            self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2969        }
2970    }
2971
2972    fn on_connection_closed(
2973        &mut self,
2974        ConnectionClosed {
2975            peer_id,
2976            connection_id,
2977            endpoint,
2978            remaining_established,
2979            ..
2980        }: ConnectionClosed,
2981    ) {
2982        // Remove IP from peer scoring system
2983        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
2984            if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2985                peer_score.remove_ip(&peer_id, &ip);
2986            } else {
2987                tracing::trace!(
2988                    peer=%peer_id,
2989                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2990                    endpoint
2991                )
2992            }
2993        }
2994
2995        if remaining_established != 0 {
2996            // Remove the connection from the list
2997            if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2998                let index = peer
2999                    .connections
3000                    .iter()
3001                    .position(|v| v == &connection_id)
3002                    .expect("Previously established connection to peer must be present");
3003                peer.connections.remove(index);
3004
3005                // If there are more connections and this peer is in a mesh, inform the first
3006                // connection handler.
3007                if !peer.connections.is_empty() {
3008                    for topic in &peer.topics {
3009                        if let Some(mesh_peers) = self.mesh.get(topic) {
3010                            if mesh_peers.contains(&peer_id) {
3011                                self.events.push_back(ToSwarm::NotifyHandler {
3012                                    peer_id,
3013                                    event: HandlerIn::JoinedMesh,
3014                                    handler: NotifyHandler::One(peer.connections[0]),
3015                                });
3016                                break;
3017                            }
3018                        }
3019                    }
3020                }
3021            }
3022        } else {
3023            // remove from mesh, topic_peers, peer_topic and the fanout
3024            tracing::debug!(peer=%peer_id, "Peer disconnected");
3025            let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
3026                tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
3027                return;
3028            };
3029
3030            // remove peer from all mappings
3031            for topic in &connected_peer.topics {
3032                // check the mesh for the topic
3033                if let Some(mesh_peers) = self.mesh.get_mut(topic) {
3034                    // check if the peer is in the mesh and remove it
3035                    if mesh_peers.remove(&peer_id) {
3036                        #[cfg(feature = "metrics")]
3037                        if let Some(m) = self.metrics.as_mut() {
3038                            m.peers_removed(topic, Churn::Dc, 1);
3039                            m.set_mesh_peers(topic, mesh_peers.len());
3040                        }
3041                    };
3042                }
3043
3044                #[cfg(feature = "metrics")]
3045                if let Some(m) = self.metrics.as_mut() {
3046                    m.dec_topic_peers(topic);
3047                }
3048
3049                // remove from fanout
3050                self.fanout
3051                    .get_mut(topic)
3052                    .map(|peers| peers.remove(&peer_id));
3053            }
3054
3055            // Forget px and outbound status for this peer
3056            self.px_peers.remove(&peer_id);
3057
3058            // If metrics are enabled, register the disconnection of a peer based on its protocol.
3059            #[cfg(feature = "metrics")]
3060            if let Some(metrics) = self.metrics.as_mut() {
3061                metrics.peer_protocol_disconnected(connected_peer.kind);
3062            }
3063
3064            self.connected_peers.remove(&peer_id);
3065
3066            if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3067                peer_score.remove_peer(&peer_id);
3068            }
3069        }
3070    }
3071
3072    fn on_address_change(
3073        &mut self,
3074        AddressChange {
3075            peer_id,
3076            old: endpoint_old,
3077            new: endpoint_new,
3078            ..
3079        }: AddressChange,
3080    ) {
3081        // Exchange IP in peer scoring system
3082        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3083            if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
3084                peer_score.remove_ip(&peer_id, &ip);
3085            } else {
3086                tracing::trace!(
3087                    peer=%&peer_id,
3088                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3089                    endpoint_old
3090                )
3091            }
3092            if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
3093                peer_score.add_ip(&peer_id, ip);
3094            } else {
3095                tracing::trace!(
3096                    peer=%peer_id,
3097                    "Couldn't extract ip from endpoint of peer with endpoint {:?}",
3098                    endpoint_new
3099                )
3100            }
3101        }
3102    }
3103
3104    #[cfg(feature = "metrics")]
3105    /// Register topics to ensure metrics are recorded correctly for these topics.
3106    pub fn register_topics_for_metrics(&mut self, topics: Vec<TopicHash>) {
3107        if let Some(metrics) = &mut self.metrics {
3108            metrics.register_allowed_topics(topics);
3109        }
3110    }
3111}
3112
3113fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
3114    addr.iter().find_map(|p| match p {
3115        Ip4(addr) => Some(IpAddr::V4(addr)),
3116        Ip6(addr) => Some(IpAddr::V6(addr)),
3117        _ => None,
3118    })
3119}
3120
3121impl<C, F> NetworkBehaviour for Behaviour<C, F>
3122where
3123    C: Send + 'static + DataTransform,
3124    F: Send + 'static + TopicSubscriptionFilter,
3125{
3126    type ConnectionHandler = Handler;
3127    type ToSwarm = Event;
3128
3129    fn handle_established_inbound_connection(
3130        &mut self,
3131        connection_id: ConnectionId,
3132        peer_id: PeerId,
3133        _: &Multiaddr,
3134        _: &Multiaddr,
3135    ) -> Result<THandler<Self>, ConnectionDenied> {
3136        // By default we assume a peer is only a floodsub peer.
3137        //
3138        // The protocol negotiation occurs once a message is sent/received. Once this happens we
3139        // update the type of peer that this is in order to determine which kind of routing should
3140        // occur.
3141        let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3142            kind: PeerKind::Floodsub,
3143            connections: vec![],
3144            outbound: false,
3145            sender: Sender::new(self.config.connection_handler_queue_len()),
3146            topics: Default::default(),
3147            dont_send: LinkedHashMap::new(),
3148        });
3149        // Add the new connection
3150        connected_peer.connections.push(connection_id);
3151
3152        Ok(Handler::new(
3153            self.config.protocol_config(),
3154            connected_peer.sender.new_receiver(),
3155        ))
3156    }
3157
3158    fn handle_established_outbound_connection(
3159        &mut self,
3160        connection_id: ConnectionId,
3161        peer_id: PeerId,
3162        _: &Multiaddr,
3163        _: Endpoint,
3164        _: PortUse,
3165    ) -> Result<THandler<Self>, ConnectionDenied> {
3166        let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
3167            kind: PeerKind::Floodsub,
3168            connections: vec![],
3169            // Diverging from the go implementation we only want to consider a peer as outbound peer
3170            // if its first connection is outbound.
3171            outbound: !self.px_peers.contains(&peer_id),
3172            sender: Sender::new(self.config.connection_handler_queue_len()),
3173            topics: Default::default(),
3174            dont_send: LinkedHashMap::new(),
3175        });
3176        // Add the new connection
3177        connected_peer.connections.push(connection_id);
3178
3179        Ok(Handler::new(
3180            self.config.protocol_config(),
3181            connected_peer.sender.new_receiver(),
3182        ))
3183    }
3184
3185    fn on_connection_handler_event(
3186        &mut self,
3187        propagation_source: PeerId,
3188        _connection_id: ConnectionId,
3189        handler_event: THandlerOutEvent<Self>,
3190    ) {
3191        match handler_event {
3192            HandlerEvent::PeerKind(kind) => {
3193                // We have identified the protocol this peer is using
3194
3195                #[cfg(feature = "metrics")]
3196                if let Some(metrics) = self.metrics.as_mut() {
3197                    metrics.peer_protocol_connected(kind);
3198                }
3199
3200                if let PeerKind::NotSupported = kind {
3201                    tracing::debug!(
3202                        peer=%propagation_source,
3203                        "Peer does not support gossipsub protocols"
3204                    );
3205                    self.events
3206                        .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3207                            peer_id: propagation_source,
3208                        }));
3209                } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3210                    // Only change the value if the old value is Floodsub (the default set in
3211                    // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
3212                    // All other PeerKind changes are ignored.
3213                    tracing::debug!(
3214                        peer=%propagation_source,
3215                        peer_type=%kind,
3216                        "New peer type found for peer"
3217                    );
3218                    if let PeerKind::Floodsub = conn.kind {
3219                        conn.kind = kind;
3220                    }
3221                }
3222            }
3223            HandlerEvent::MessageDropped(rpc) => {
3224                // Account for this in the scoring logic
3225                if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3226                    peer_score.failed_message_slow_peer(&propagation_source);
3227                }
3228
3229                // Keep track of expired messages for the application layer.
3230                let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3231                failed_messages.timeout += 1;
3232                match rpc {
3233                    RpcOut::Publish { .. } => {
3234                        failed_messages.publish += 1;
3235                    }
3236                    RpcOut::Forward { .. } => {
3237                        failed_messages.forward += 1;
3238                    }
3239                    _ => {}
3240                }
3241
3242                // Record metrics on the failure.
3243                #[cfg(feature = "metrics")]
3244                if let Some(metrics) = self.metrics.as_mut() {
3245                    match rpc {
3246                        RpcOut::Publish { message, .. } => {
3247                            metrics.publish_msg_dropped(&message.topic);
3248                            metrics.timeout_msg_dropped(&message.topic);
3249                        }
3250                        RpcOut::Forward { message, .. } => {
3251                            metrics.forward_msg_dropped(&message.topic);
3252                            metrics.timeout_msg_dropped(&message.topic);
3253                        }
3254                        _ => {}
3255                    }
3256                }
3257            }
3258            HandlerEvent::Message {
3259                rpc,
3260                invalid_messages,
3261            } => {
3262                // Handle the gossipsub RPC
3263
3264                // Handle subscriptions
3265                // Update connected peers topics
3266                if !rpc.subscriptions.is_empty() {
3267                    self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3268                }
3269
3270                // Check if peer is graylisted in which case we ignore the event
3271                if let (true, _) = self
3272                    .peer_score
3273                    .below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3274                {
3275                    tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3276                    return;
3277                }
3278
3279                // Handle any invalid messages from this peer
3280                if let PeerScoreState::Active(_) = self.peer_score {
3281                    for (raw_message, validation_error) in invalid_messages {
3282                        self.handle_invalid_message(
3283                            &propagation_source,
3284                            &raw_message.topic,
3285                            None,
3286                            RejectReason::ValidationError(validation_error),
3287                        )
3288                    }
3289                } else {
3290                    // log the invalid messages
3291                    for (message, validation_error) in invalid_messages {
3292                        tracing::warn!(
3293                            peer=%propagation_source,
3294                            source=?message.source,
3295                            "Invalid message from peer. Reason: {:?}",
3296                            validation_error,
3297                        );
3298                    }
3299                }
3300
3301                // Handle messages
3302                for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3303                    // Only process the amount of messages the configuration allows.
3304                    if self
3305                        .config
3306                        .max_messages_per_rpc()
3307                        .is_some_and(|max_msg| count >= max_msg)
3308                    {
3309                        tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3310                        break;
3311                    }
3312                    self.handle_received_message(raw_message, &propagation_source);
3313                }
3314
3315                // Handle control messages
3316                // group some control messages, this minimises SendEvents (code is simplified to
3317                // handle each event at a time however)
3318                let mut ihave_msgs = vec![];
3319                let mut graft_msgs = vec![];
3320                let mut prune_msgs = vec![];
3321                for (count, control_msg) in rpc.control_msgs.into_iter().enumerate() {
3322                    // Only process the amount of messages the configuration allows.
3323                    if self
3324                        .config
3325                        .max_messages_per_rpc()
3326                        .is_some_and(|max_msg| count >= max_msg)
3327                    {
3328                        tracing::warn!("Received more control messages than permitted. Ignoring further messages. Processed: {}", count);
3329                        break;
3330                    }
3331
3332                    match control_msg {
3333                        ControlAction::IHave(IHave {
3334                            topic_hash,
3335                            message_ids,
3336                        }) => {
3337                            ihave_msgs.push((topic_hash, message_ids));
3338                        }
3339                        ControlAction::IWant(IWant { message_ids }) => {
3340                            self.handle_iwant(&propagation_source, message_ids)
3341                        }
3342                        ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3343                        ControlAction::Prune(Prune {
3344                            topic_hash,
3345                            peers,
3346                            backoff,
3347                        }) => prune_msgs.push((topic_hash, peers, backoff)),
3348                        ControlAction::IDontWant(IDontWant { message_ids }) => {
3349                            let Some(peer) = self.connected_peers.get_mut(&propagation_source)
3350                            else {
3351                                tracing::error!(peer = %propagation_source,
3352                                    "Could not handle IDONTWANT, peer doesn't exist in connected peer list");
3353                                continue;
3354                            };
3355                            #[cfg(feature = "metrics")]
3356                            if let Some(metrics) = self.metrics.as_mut() {
3357                                metrics.register_idontwant(message_ids.len());
3358                            }
3359                            for message_id in message_ids {
3360                                peer.dont_send.insert(message_id, Instant::now());
3361                                // Don't exceed capacity.
3362                                if peer.dont_send.len() > IDONTWANT_CAP {
3363                                    peer.dont_send.pop_front();
3364                                }
3365                            }
3366                        }
3367                    }
3368                }
3369                if !ihave_msgs.is_empty() {
3370                    self.handle_ihave(&propagation_source, ihave_msgs);
3371                }
3372                if !graft_msgs.is_empty() {
3373                    self.handle_graft(&propagation_source, graft_msgs);
3374                }
3375                if !prune_msgs.is_empty() {
3376                    self.handle_prune(&propagation_source, prune_msgs);
3377                }
3378            }
3379        }
3380    }
3381
3382    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3383    fn poll(
3384        &mut self,
3385        cx: &mut Context<'_>,
3386    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3387        if let Some(event) = self.events.pop_front() {
3388            return Poll::Ready(event);
3389        }
3390
3391        // update scores
3392        if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
3393            if peer_score.decay_interval.poll_unpin(cx).is_ready() {
3394                peer_score.refresh_scores();
3395                peer_score
3396                    .decay_interval
3397                    .reset(peer_score.params.decay_interval);
3398            }
3399        }
3400
3401        if self.heartbeat.poll_unpin(cx).is_ready() {
3402            self.heartbeat();
3403            self.heartbeat.reset(self.config.heartbeat_interval());
3404        }
3405
3406        Poll::Pending
3407    }
3408
3409    fn on_swarm_event(&mut self, event: FromSwarm) {
3410        match event {
3411            FromSwarm::ConnectionEstablished(connection_established) => {
3412                self.on_connection_established(connection_established)
3413            }
3414            FromSwarm::ConnectionClosed(connection_closed) => {
3415                self.on_connection_closed(connection_closed)
3416            }
3417            FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3418            _ => {}
3419        }
3420    }
3421}
3422
3423/// This is called when peers are added to any mesh. It checks if the peer existed
3424/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify
3425/// the appropriate connection handler to maintain a connection.
3426fn peer_added_to_mesh(
3427    peer_id: PeerId,
3428    new_topics: Vec<&TopicHash>,
3429    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3430    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3431    connections: &HashMap<PeerId, PeerDetails>,
3432) {
3433    // Ensure there is an active connection
3434    let connection_id = match connections.get(&peer_id) {
3435        Some(p) => p
3436            .connections
3437            .first()
3438            .expect("There should be at least one connection to a peer."),
3439        None => {
3440            tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3441            return;
3442        }
3443    };
3444
3445    if let Some(peer) = connections.get(&peer_id) {
3446        for topic in &peer.topics {
3447            if !new_topics.contains(&topic) {
3448                if let Some(mesh_peers) = mesh.get(topic) {
3449                    if mesh_peers.contains(&peer_id) {
3450                        // the peer is already in a mesh for another topic
3451                        return;
3452                    }
3453                }
3454            }
3455        }
3456    }
3457    // This is the first mesh the peer has joined, inform the handler
3458    events.push_back(ToSwarm::NotifyHandler {
3459        peer_id,
3460        event: HandlerIn::JoinedMesh,
3461        handler: NotifyHandler::One(*connection_id),
3462    });
3463}
3464
3465/// This is called when peers are removed from a mesh. It checks if the peer exists
3466/// in any other mesh. If this is the last mesh they have joined, we return true, in order to
3467/// notify the handler to no longer maintain a connection.
3468fn peer_removed_from_mesh(
3469    peer_id: PeerId,
3470    old_topic: &TopicHash,
3471    mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3472    events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3473    connections: &HashMap<PeerId, PeerDetails>,
3474) {
3475    // Ensure there is an active connection
3476    let connection_id = match connections.get(&peer_id) {
3477        Some(p) => p
3478            .connections
3479            .first()
3480            .expect("There should be at least one connection to a peer."),
3481        None => {
3482            tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3483            return;
3484        }
3485    };
3486
3487    if let Some(peer) = connections.get(&peer_id) {
3488        for topic in &peer.topics {
3489            if topic != old_topic {
3490                if let Some(mesh_peers) = mesh.get(topic) {
3491                    if mesh_peers.contains(&peer_id) {
3492                        // the peer exists in another mesh still
3493                        return;
3494                    }
3495                }
3496            }
3497        }
3498    }
3499    // The peer is not in any other mesh, inform the handler
3500    events.push_back(ToSwarm::NotifyHandler {
3501        peer_id,
3502        event: HandlerIn::LeftMesh,
3503        handler: NotifyHandler::One(*connection_id),
3504    });
3505}
3506
3507/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
3508/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3509/// that gets as input the number of filtered peers.
3510fn get_random_peers_dynamic(
3511    connected_peers: &HashMap<PeerId, PeerDetails>,
3512    topic_hash: &TopicHash,
3513    // maps the number of total peers to the number of selected peers
3514    n_map: impl Fn(usize) -> usize,
3515    mut f: impl FnMut(&PeerId) -> bool,
3516) -> BTreeSet<PeerId> {
3517    let mut gossip_peers = connected_peers
3518        .iter()
3519        .filter(|(_, p)| p.topics.contains(topic_hash))
3520        .filter(|(peer_id, _)| f(peer_id))
3521        .filter(|(_, p)| p.kind.is_gossipsub())
3522        .map(|(peer_id, _)| *peer_id)
3523        .collect::<Vec<PeerId>>();
3524
3525    // if we have less than needed, return them
3526    let n = n_map(gossip_peers.len());
3527    if gossip_peers.len() <= n {
3528        tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3529        return gossip_peers.into_iter().collect();
3530    }
3531
3532    // we have more peers than needed, shuffle them and return n of them
3533    let mut rng = thread_rng();
3534    gossip_peers.partial_shuffle(&mut rng, n);
3535
3536    tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3537
3538    gossip_peers.into_iter().take(n).collect()
3539}
3540
3541/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3542/// filtered by the function `f`.
3543fn get_random_peers(
3544    connected_peers: &HashMap<PeerId, PeerDetails>,
3545    topic_hash: &TopicHash,
3546    n: usize,
3547    f: impl FnMut(&PeerId) -> bool,
3548) -> BTreeSet<PeerId> {
3549    get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3550}
3551
3552/// Validates the combination of signing, privacy and message validation to ensure the
3553/// configuration will not reject published messages.
3554fn validate_config(
3555    authenticity: &MessageAuthenticity,
3556    validation_mode: &ValidationMode,
3557) -> Result<(), &'static str> {
3558    match validation_mode {
3559        ValidationMode::Anonymous => {
3560            if authenticity.is_signing() {
3561                return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3562            }
3563
3564            if !authenticity.is_anonymous() {
3565                return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3566            }
3567        }
3568        ValidationMode::Strict => {
3569            if !authenticity.is_signing() {
3570                return Err(
3571                    "Messages will be
3572                published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3573                the validation or privacy settings in the config"
3574                );
3575            }
3576        }
3577        _ => {}
3578    }
3579    Ok(())
3580}
3581
3582impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3583    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3584        f.debug_struct("Behaviour")
3585            .field("config", &self.config)
3586            .field("events", &self.events.len())
3587            .field("publish_config", &self.publish_config)
3588            .field("mesh", &self.mesh)
3589            .field("fanout", &self.fanout)
3590            .field("fanout_last_pub", &self.fanout_last_pub)
3591            .field("mcache", &self.mcache)
3592            .field("heartbeat", &self.heartbeat)
3593            .finish()
3594    }
3595}
3596
3597impl fmt::Debug for PublishConfig {
3598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3599        match self {
3600            PublishConfig::Signing { author, .. } => {
3601                f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3602            }
3603            PublishConfig::Author(author) => {
3604                f.write_fmt(format_args!("PublishConfig::Author({author})"))
3605            }
3606            PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3607            PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3608        }
3609    }
3610}