1use ant_libp2p_core as libp2p_core;
22use ant_libp2p_swarm as libp2p_swarm;
23
24use std::{
25 cmp::{max, Ordering, Ordering::Equal},
26 collections::{BTreeSet, HashMap, HashSet, VecDeque},
27 fmt,
28 fmt::Debug,
29 net::IpAddr,
30 task::{Context, Poll},
31 time::Duration,
32};
33
34use futures::FutureExt;
35use futures_timer::Delay;
36use libp2p_core::{
37 multiaddr::Protocol::{Ip4, Ip6},
38 transport::PortUse,
39 Endpoint, Multiaddr,
40};
41use libp2p_identity::{Keypair, PeerId};
42use libp2p_swarm::{
43 behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
44 dial_opts::DialOpts,
45 ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent,
46 THandlerOutEvent, ToSwarm,
47};
48use prometheus_client::registry::Registry;
49use quick_protobuf::{MessageWrite, Writer};
50use rand::{seq::SliceRandom, thread_rng};
51use web_time::{Instant, SystemTime};
52
53use crate::{
54 backoff::BackoffStorage,
55 config::{Config, ValidationMode},
56 gossip_promises::GossipPromises,
57 handler::{Handler, HandlerEvent, HandlerIn},
58 mcache::MessageCache,
59 metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
60 peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason},
61 protocol::SIGNING_PREFIX,
62 rpc::Sender,
63 rpc_proto::proto,
64 subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
65 time_cache::DuplicateCache,
66 topic::{Hasher, Topic, TopicHash},
67 transform::{DataTransform, IdentityTransform},
68 types::{
69 ControlAction, Graft, IHave, IWant, Message, MessageAcceptance, MessageId, PeerConnections,
70 PeerInfo, PeerKind, Prune, RawMessage, RpcOut, Subscription, SubscriptionAction,
71 },
72 FailedMessages, PublishError, SubscriptionError, TopicScoreParams, ValidationError,
73};
74
75#[cfg(test)]
76mod tests;
77
78#[derive(Clone)]
85pub enum MessageAuthenticity {
86 Signed(Keypair),
89 Author(PeerId),
94 RandomAuthor,
99 Anonymous,
109}
110
111impl MessageAuthenticity {
112 pub fn is_signing(&self) -> bool {
114 matches!(self, MessageAuthenticity::Signed(_))
115 }
116
117 pub fn is_anonymous(&self) -> bool {
118 matches!(self, MessageAuthenticity::Anonymous)
119 }
120}
121
122#[derive(Debug)]
124pub enum Event {
125 Message {
127 propagation_source: PeerId,
129 message_id: MessageId,
132 message: Message,
134 },
135 Subscribed {
137 peer_id: PeerId,
139 topic: TopicHash,
141 },
142 Unsubscribed {
144 peer_id: PeerId,
146 topic: TopicHash,
148 },
149 GossipsubNotSupported { peer_id: PeerId },
151 SlowPeer {
153 peer_id: PeerId,
155 failed_messages: FailedMessages,
157 },
158}
159
160#[allow(clippy::large_enum_variant)]
163enum PublishConfig {
164 Signing {
165 keypair: Keypair,
166 author: PeerId,
167 inline_key: Option<Vec<u8>>,
168 last_seq_no: SequenceNumber,
169 },
170 Author(PeerId),
171 RandomAuthor,
172 Anonymous,
173}
174
175#[derive(Debug)]
179struct SequenceNumber(u64);
180
181impl SequenceNumber {
182 fn new() -> Self {
183 let unix_timestamp = SystemTime::now()
184 .duration_since(SystemTime::UNIX_EPOCH)
185 .expect("time to be linear")
186 .as_nanos();
187
188 Self(unix_timestamp as u64)
189 }
190
191 fn next(&mut self) -> u64 {
192 self.0 = self
193 .0
194 .checked_add(1)
195 .expect("to not exhaust u64 space for sequence numbers");
196
197 self.0
198 }
199}
200
201impl PublishConfig {
202 pub(crate) fn get_own_id(&self) -> Option<&PeerId> {
203 match self {
204 Self::Signing { author, .. } => Some(author),
205 Self::Author(author) => Some(author),
206 _ => None,
207 }
208 }
209}
210
211impl From<MessageAuthenticity> for PublishConfig {
212 fn from(authenticity: MessageAuthenticity) -> Self {
213 match authenticity {
214 MessageAuthenticity::Signed(keypair) => {
215 let public_key = keypair.public();
216 let key_enc = public_key.encode_protobuf();
217 let key = if key_enc.len() <= 42 {
218 None
222 } else {
223 Some(key_enc)
225 };
226
227 PublishConfig::Signing {
228 keypair,
229 author: public_key.to_peer_id(),
230 inline_key: key,
231 last_seq_no: SequenceNumber::new(),
232 }
233 }
234 MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
235 MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
236 MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
237 }
238 }
239}
240
241pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
253 config: Config,
255
256 events: VecDeque<ToSwarm<Event, HandlerIn>>,
258
259 publish_config: PublishConfig,
261
262 duplicate_cache: DuplicateCache<MessageId>,
265
266 connected_peers: HashMap<PeerId, PeerConnections>,
269
270 explicit_peers: HashSet<PeerId>,
273
274 blacklisted_peers: HashSet<PeerId>,
277
278 mesh: HashMap<TopicHash, BTreeSet<PeerId>>,
280
281 fanout: HashMap<TopicHash, BTreeSet<PeerId>>,
283
284 fanout_last_pub: HashMap<TopicHash, Instant>,
286
287 backoffs: BackoffStorage,
289
290 mcache: MessageCache,
292
293 heartbeat: Delay,
295
296 heartbeat_ticks: u64,
299
300 px_peers: HashSet<PeerId>,
305
306 outbound_peers: HashSet<PeerId>,
309
310 peer_score: Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
313
314 count_received_ihave: HashMap<PeerId, usize>,
316
317 count_sent_iwant: HashMap<PeerId, usize>,
319
320 published_message_ids: DuplicateCache<MessageId>,
323
324 subscription_filter: F,
326
327 data_transform: D,
331
332 metrics: Option<Metrics>,
334
335 failed_messages: HashMap<PeerId, FailedMessages>,
337}
338
339impl<D, F> Behaviour<D, F>
340where
341 D: DataTransform + Default,
342 F: TopicSubscriptionFilter + Default,
343{
344 pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
347 Self::new_with_subscription_filter_and_transform(
348 privacy,
349 config,
350 None,
351 F::default(),
352 D::default(),
353 )
354 }
355
356 pub fn new_with_metrics(
360 privacy: MessageAuthenticity,
361 config: Config,
362 metrics_registry: &mut Registry,
363 metrics_config: MetricsConfig,
364 ) -> Result<Self, &'static str> {
365 Self::new_with_subscription_filter_and_transform(
366 privacy,
367 config,
368 Some((metrics_registry, metrics_config)),
369 F::default(),
370 D::default(),
371 )
372 }
373}
374
375impl<D, F> Behaviour<D, F>
376where
377 D: DataTransform + Default,
378 F: TopicSubscriptionFilter,
379{
380 pub fn new_with_subscription_filter(
383 privacy: MessageAuthenticity,
384 config: Config,
385 metrics: Option<(&mut Registry, MetricsConfig)>,
386 subscription_filter: F,
387 ) -> Result<Self, &'static str> {
388 Self::new_with_subscription_filter_and_transform(
389 privacy,
390 config,
391 metrics,
392 subscription_filter,
393 D::default(),
394 )
395 }
396}
397
398impl<D, F> Behaviour<D, F>
399where
400 D: DataTransform,
401 F: TopicSubscriptionFilter + Default,
402{
403 pub fn new_with_transform(
406 privacy: MessageAuthenticity,
407 config: Config,
408 metrics: Option<(&mut Registry, MetricsConfig)>,
409 data_transform: D,
410 ) -> Result<Self, &'static str> {
411 Self::new_with_subscription_filter_and_transform(
412 privacy,
413 config,
414 metrics,
415 F::default(),
416 data_transform,
417 )
418 }
419}
420
421impl<D, F> Behaviour<D, F>
422where
423 D: DataTransform,
424 F: TopicSubscriptionFilter,
425{
426 pub fn new_with_subscription_filter_and_transform(
429 privacy: MessageAuthenticity,
430 config: Config,
431 metrics: Option<(&mut Registry, MetricsConfig)>,
432 subscription_filter: F,
433 data_transform: D,
434 ) -> Result<Self, &'static str> {
435 validate_config(&privacy, config.validation_mode())?;
440
441 Ok(Behaviour {
442 metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
443 events: VecDeque::new(),
444 publish_config: privacy.into(),
445 duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
446 explicit_peers: HashSet::new(),
447 blacklisted_peers: HashSet::new(),
448 mesh: HashMap::new(),
449 fanout: HashMap::new(),
450 fanout_last_pub: HashMap::new(),
451 backoffs: BackoffStorage::new(
452 &config.prune_backoff(),
453 config.heartbeat_interval(),
454 config.backoff_slack(),
455 ),
456 mcache: MessageCache::new(config.history_gossip(), config.history_length()),
457 heartbeat: Delay::new(config.heartbeat_interval() + config.heartbeat_initial_delay()),
458 heartbeat_ticks: 0,
459 px_peers: HashSet::new(),
460 outbound_peers: HashSet::new(),
461 peer_score: None,
462 count_received_ihave: HashMap::new(),
463 count_sent_iwant: HashMap::new(),
464 connected_peers: HashMap::new(),
465 published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
466 config,
467 subscription_filter,
468 data_transform,
469 failed_messages: Default::default(),
470 })
471 }
472}
473
474impl<D, F> Behaviour<D, F>
475where
476 D: DataTransform + Send + 'static,
477 F: TopicSubscriptionFilter + Send + 'static,
478{
479 pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
481 self.mesh.keys()
482 }
483
484 pub fn mesh_peers(&self, topic_hash: &TopicHash) -> impl Iterator<Item = &PeerId> {
486 self.mesh.get(topic_hash).into_iter().flat_map(|x| x.iter())
487 }
488
489 pub fn all_mesh_peers(&self) -> impl Iterator<Item = &PeerId> {
490 let mut res = BTreeSet::new();
491 for peers in self.mesh.values() {
492 res.extend(peers);
493 }
494 res.into_iter()
495 }
496
497 pub fn all_peers(&self) -> impl Iterator<Item = (&PeerId, Vec<&TopicHash>)> {
499 self.connected_peers
500 .iter()
501 .map(|(peer_id, peer)| (peer_id, peer.topics.iter().collect()))
502 }
503
504 pub fn peer_protocol(&self) -> impl Iterator<Item = (&PeerId, &PeerKind)> {
506 self.connected_peers.iter().map(|(k, v)| (k, &v.kind))
507 }
508
509 pub fn peer_score(&self, peer_id: &PeerId) -> Option<f64> {
511 self.peer_score
512 .as_ref()
513 .map(|(score, ..)| score.score(peer_id))
514 }
515
516 pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
521 tracing::debug!(%topic, "Subscribing to topic");
522 let topic_hash = topic.hash();
523 if !self.subscription_filter.can_subscribe(&topic_hash) {
524 return Err(SubscriptionError::NotAllowed);
525 }
526
527 if self.mesh.contains_key(&topic_hash) {
528 tracing::debug!(%topic, "Topic is already in the mesh");
529 return Ok(false);
530 }
531
532 for peer_id in self.connected_peers.keys().copied().collect::<Vec<_>>() {
534 tracing::debug!(%peer_id, "Sending SUBSCRIBE to peer");
535 let event = RpcOut::Subscribe(topic_hash.clone());
536 self.send_message(peer_id, event);
537 }
538
539 self.join(&topic_hash);
542 tracing::debug!(%topic, "Subscribed to topic");
543 Ok(true)
544 }
545
546 pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> bool {
550 tracing::debug!(%topic, "Unsubscribing from topic");
551 let topic_hash = topic.hash();
552
553 if !self.mesh.contains_key(&topic_hash) {
554 tracing::debug!(topic=%topic_hash, "Already unsubscribed from topic");
555 return false;
557 }
558
559 for peer in self.connected_peers.keys().copied().collect::<Vec<_>>() {
561 tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
562 let event = RpcOut::Unsubscribe(topic_hash.clone());
563 self.send_message(peer, event);
564 }
565
566 self.leave(&topic_hash);
569
570 tracing::debug!(topic=%topic_hash, "Unsubscribed from topic");
571 true
572 }
573
574 pub fn publish(
576 &mut self,
577 topic: impl Into<TopicHash>,
578 data: impl Into<Vec<u8>>,
579 ) -> Result<MessageId, PublishError> {
580 let data = data.into();
581 let topic = topic.into();
582
583 let transformed_data = self
585 .data_transform
586 .outbound_transform(&topic, data.clone())?;
587
588 if transformed_data.len() > self.config.max_transmit_size() {
590 return Err(PublishError::MessageTooLarge);
591 }
592
593 let raw_message = self.build_raw_message(topic, transformed_data)?;
594
595 let msg_id = self.config.message_id(&Message {
597 source: raw_message.source,
598 data, sequence_number: raw_message.sequence_number,
600 topic: raw_message.topic.clone(),
601 });
602
603 if self.duplicate_cache.contains(&msg_id) {
605 tracing::warn!(
608 message=%msg_id,
609 "Not publishing a message that has already been published"
610 );
611 return Err(PublishError::Duplicate);
612 }
613
614 tracing::trace!(message=%msg_id, "Publishing message");
615
616 let topic_hash = raw_message.topic.clone();
617
618 let mut peers_on_topic = self
619 .connected_peers
620 .iter()
621 .filter(|(_, p)| p.topics.contains(&topic_hash))
622 .map(|(peer_id, _)| peer_id)
623 .peekable();
624
625 if peers_on_topic.peek().is_none() {
626 return Err(PublishError::InsufficientPeers);
627 }
628
629 let mut recipient_peers = HashSet::new();
630 if self.config.flood_publish() {
631 recipient_peers.extend(peers_on_topic.filter(|p| {
633 self.explicit_peers.contains(*p)
634 || !self.score_below_threshold(p, |ts| ts.publish_threshold).0
635 }));
636 } else {
637 match self.mesh.get(&topic_hash) {
638 Some(mesh_peers) => {
640 let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
643
644 if needed_extra_peers > 0 {
645 let peer_list = get_random_peers(
650 &self.connected_peers,
651 &topic_hash,
652 needed_extra_peers,
653 |peer| {
654 !mesh_peers.contains(peer)
655 && !self.explicit_peers.contains(peer)
656 && !self
657 .score_below_threshold(peer, |pst| pst.publish_threshold)
658 .0
659 },
660 );
661 recipient_peers.extend(peer_list);
662 }
663
664 recipient_peers.extend(mesh_peers);
665 }
666 None => {
668 tracing::debug!(topic=%topic_hash, "Topic not in the mesh");
669 if self.fanout.contains_key(&topic_hash) {
671 for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
672 recipient_peers.insert(*peer);
673 }
674 } else {
675 let mesh_n = self.config.mesh_n();
677 let new_peers =
678 get_random_peers(&self.connected_peers, &topic_hash, mesh_n, {
679 |p| {
680 !self.explicit_peers.contains(p)
681 && !self
682 .score_below_threshold(p, |pst| pst.publish_threshold)
683 .0
684 }
685 });
686 self.fanout.insert(topic_hash.clone(), new_peers.clone());
688 for peer in new_peers {
689 tracing::debug!(%peer, "Peer added to fanout");
690 recipient_peers.insert(peer);
691 }
692 }
693 self.fanout_last_pub
695 .insert(topic_hash.clone(), Instant::now());
696 }
697 }
698
699 recipient_peers
701 .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id)));
702
703 for (peer, connections) in &self.connected_peers {
705 if connections.kind == PeerKind::Floodsub
706 && !self
707 .score_below_threshold(peer, |ts| ts.publish_threshold)
708 .0
709 {
710 recipient_peers.insert(*peer);
711 }
712 }
713 }
714
715 self.duplicate_cache.insert(msg_id.clone());
718 self.mcache.put(&msg_id, raw_message.clone());
719
720 if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
723 if !self.config.allow_self_origin() {
724 self.published_message_ids.insert(msg_id.clone());
725 }
726 }
727
728 let mut publish_failed = true;
730 for peer_id in recipient_peers.iter() {
731 tracing::trace!(peer=%peer_id, "Sending message to peer");
732 if self.send_message(
733 *peer_id,
734 RpcOut::Publish {
735 message: raw_message.clone(),
736 timeout: Delay::new(self.config.publish_queue_duration()),
737 },
738 ) {
739 publish_failed = false
740 }
741 }
742
743 if recipient_peers.is_empty() {
744 return Err(PublishError::InsufficientPeers);
745 }
746
747 if publish_failed {
748 return Err(PublishError::AllQueuesFull(recipient_peers.len()));
749 }
750
751 tracing::debug!(message=%msg_id, "Published message");
752
753 if let Some(metrics) = self.metrics.as_mut() {
754 metrics.register_published_message(&topic_hash);
755 }
756
757 Ok(msg_id)
758 }
759
760 pub fn report_message_validation_result(
780 &mut self,
781 msg_id: &MessageId,
782 propagation_source: &PeerId,
783 acceptance: MessageAcceptance,
784 ) -> bool {
785 let reject_reason = match acceptance {
786 MessageAcceptance::Accept => {
787 let (raw_message, originating_peers) = match self.mcache.validate(msg_id) {
788 Some((raw_message, originating_peers)) => {
789 (raw_message.clone(), originating_peers)
790 }
791 None => {
792 tracing::warn!(
793 message=%msg_id,
794 "Message not in cache. Ignoring forwarding"
795 );
796 if let Some(metrics) = self.metrics.as_mut() {
797 metrics.memcache_miss();
798 }
799 return false;
800 }
801 };
802
803 if let Some(metrics) = self.metrics.as_mut() {
804 metrics.register_msg_validation(&raw_message.topic, &acceptance);
805 }
806
807 self.forward_msg(
808 msg_id,
809 raw_message,
810 Some(propagation_source),
811 originating_peers,
812 );
813 return true;
814 }
815 MessageAcceptance::Reject => RejectReason::ValidationFailed,
816 MessageAcceptance::Ignore => RejectReason::ValidationIgnored,
817 };
818
819 if let Some((raw_message, originating_peers)) = self.mcache.remove(msg_id) {
820 if let Some(metrics) = self.metrics.as_mut() {
821 metrics.register_msg_validation(&raw_message.topic, &acceptance);
822 }
823
824 if let Some((peer_score, ..)) = &mut self.peer_score {
827 peer_score.reject_message(
828 propagation_source,
829 msg_id,
830 &raw_message.topic,
831 reject_reason,
832 );
833 for peer in originating_peers.iter() {
834 peer_score.reject_message(peer, msg_id, &raw_message.topic, reject_reason);
835 }
836 }
837 true
838 } else {
839 tracing::warn!(message=%msg_id, "Rejected message not in cache");
840 false
841 }
842 }
843
844 pub fn add_explicit_peer(&mut self, peer_id: &PeerId) {
846 tracing::debug!(peer=%peer_id, "Adding explicit peer");
847
848 self.explicit_peers.insert(*peer_id);
849
850 self.check_explicit_peer_connection(peer_id);
851 }
852
853 pub fn remove_explicit_peer(&mut self, peer_id: &PeerId) {
856 tracing::debug!(peer=%peer_id, "Removing explicit peer");
857 self.explicit_peers.remove(peer_id);
858 }
859
860 pub fn blacklist_peer(&mut self, peer_id: &PeerId) {
863 if self.blacklisted_peers.insert(*peer_id) {
864 tracing::debug!(peer=%peer_id, "Peer has been blacklisted");
865 }
866 }
867
868 pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
870 if self.blacklisted_peers.remove(peer_id) {
871 tracing::debug!(peer=%peer_id, "Peer has been removed from the blacklist");
872 }
873 }
874
875 pub fn with_peer_score(
879 &mut self,
880 params: PeerScoreParams,
881 threshold: PeerScoreThresholds,
882 ) -> Result<(), String> {
883 self.with_peer_score_and_message_delivery_time_callback(params, threshold, None)
884 }
885
886 pub fn with_peer_score_and_message_delivery_time_callback(
889 &mut self,
890 params: PeerScoreParams,
891 threshold: PeerScoreThresholds,
892 callback: Option<fn(&PeerId, &TopicHash, f64)>,
893 ) -> Result<(), String> {
894 params.validate()?;
895 threshold.validate()?;
896
897 if self.peer_score.is_some() {
898 return Err("Peer score set twice".into());
899 }
900
901 let interval = Delay::new(params.decay_interval);
902 let peer_score = PeerScore::new_with_message_delivery_time_callback(params, callback);
903 self.peer_score = Some((peer_score, threshold, interval, GossipPromises::default()));
904 Ok(())
905 }
906
907 pub fn set_topic_params<H: Hasher>(
911 &mut self,
912 topic: Topic<H>,
913 params: TopicScoreParams,
914 ) -> Result<(), &'static str> {
915 if let Some((peer_score, ..)) = &mut self.peer_score {
916 peer_score.set_topic_params(topic.hash(), params);
917 Ok(())
918 } else {
919 Err("Peer score must be initialised with `with_peer_score()`")
920 }
921 }
922
923 pub fn get_topic_params<H: Hasher>(&self, topic: &Topic<H>) -> Option<&TopicScoreParams> {
925 self.peer_score.as_ref()?.0.get_topic_params(&topic.hash())
926 }
927
928 pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
931 if let Some((peer_score, ..)) = &mut self.peer_score {
932 peer_score.set_application_score(peer_id, new_score)
933 } else {
934 false
935 }
936 }
937
938 fn join(&mut self, topic_hash: &TopicHash) {
940 tracing::debug!(topic=%topic_hash, "Running JOIN for topic");
941
942 if self.mesh.contains_key(topic_hash) {
944 tracing::debug!(topic=%topic_hash, "JOIN: The topic is already in the mesh, ignoring JOIN");
945 return;
946 }
947
948 let mut added_peers = HashSet::new();
949
950 if let Some(m) = self.metrics.as_mut() {
951 m.joined(topic_hash)
952 }
953
954 if let Some((_, mut peers)) = self.fanout.remove_entry(topic_hash) {
957 tracing::debug!(
958 topic=%topic_hash,
959 "JOIN: Removing peers from the fanout for topic"
960 );
961
962 peers.retain(|p| {
964 !self.explicit_peers.contains(p)
965 && !self.score_below_threshold(p, |_| 0.0).0
966 && !self.backoffs.is_backoff_with_slack(topic_hash, p)
967 });
968
969 let add_peers = std::cmp::min(peers.len(), self.config.mesh_n());
972 tracing::debug!(
973 topic=%topic_hash,
974 "JOIN: Adding {:?} peers from the fanout for topic",
975 add_peers
976 );
977 added_peers.extend(peers.iter().take(add_peers));
978
979 self.mesh.insert(
980 topic_hash.clone(),
981 peers.into_iter().take(add_peers).collect(),
982 );
983
984 self.fanout_last_pub.remove(topic_hash);
986 }
987
988 let fanaout_added = added_peers.len();
989 if let Some(m) = self.metrics.as_mut() {
990 m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added)
991 }
992
993 if added_peers.len() < self.config.mesh_n() {
995 let new_peers = get_random_peers(
997 &self.connected_peers,
998 topic_hash,
999 self.config.mesh_n() - added_peers.len(),
1000 |peer| {
1001 !added_peers.contains(peer)
1002 && !self.explicit_peers.contains(peer)
1003 && !self.score_below_threshold(peer, |_| 0.0).0
1004 && !self.backoffs.is_backoff_with_slack(topic_hash, peer)
1005 },
1006 );
1007 added_peers.extend(new_peers.clone());
1008 tracing::debug!(
1010 "JOIN: Inserting {:?} random peers into the mesh",
1011 new_peers.len()
1012 );
1013 let mesh_peers = self.mesh.entry(topic_hash.clone()).or_default();
1014 mesh_peers.extend(new_peers);
1015 }
1016
1017 let random_added = added_peers.len() - fanaout_added;
1018 if let Some(m) = self.metrics.as_mut() {
1019 m.peers_included(topic_hash, Inclusion::Random, random_added)
1020 }
1021
1022 for peer_id in added_peers {
1023 tracing::debug!(peer=%peer_id, "JOIN: Sending Graft message to peer");
1025 if let Some((peer_score, ..)) = &mut self.peer_score {
1026 peer_score.graft(&peer_id, topic_hash.clone());
1027 }
1028 self.send_message(
1029 peer_id,
1030 RpcOut::Graft(Graft {
1031 topic_hash: topic_hash.clone(),
1032 }),
1033 );
1034
1035 peer_added_to_mesh(
1037 peer_id,
1038 vec![topic_hash],
1039 &self.mesh,
1040 &mut self.events,
1041 &self.connected_peers,
1042 );
1043 }
1044
1045 let mesh_peers = self.mesh_peers(topic_hash).count();
1046 if let Some(m) = self.metrics.as_mut() {
1047 m.set_mesh_peers(topic_hash, mesh_peers)
1048 }
1049
1050 tracing::debug!(topic=%topic_hash, "Completed JOIN for topic");
1051 }
1052
1053 fn make_prune(
1055 &mut self,
1056 topic_hash: &TopicHash,
1057 peer: &PeerId,
1058 do_px: bool,
1059 on_unsubscribe: bool,
1060 ) -> Prune {
1061 if let Some((peer_score, ..)) = &mut self.peer_score {
1062 peer_score.prune(peer, topic_hash.clone());
1063 }
1064
1065 match self.connected_peers.get(peer).map(|v| &v.kind) {
1066 Some(PeerKind::Floodsub) => {
1067 tracing::error!("Attempted to prune a Floodsub peer");
1068 }
1069 Some(PeerKind::Gossipsub) => {
1070 return Prune {
1072 topic_hash: topic_hash.clone(),
1073 peers: Vec::new(),
1074 backoff: None,
1075 };
1076 }
1077 None => {
1078 tracing::error!("Attempted to Prune an unknown peer");
1079 }
1080 _ => {} }
1082
1083 let peers = if do_px {
1085 get_random_peers(
1086 &self.connected_peers,
1087 topic_hash,
1088 self.config.prune_peers(),
1089 |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0,
1090 )
1091 .into_iter()
1092 .map(|p| PeerInfo { peer_id: Some(p) })
1093 .collect()
1094 } else {
1095 Vec::new()
1096 };
1097
1098 let backoff = if on_unsubscribe {
1099 self.config.unsubscribe_backoff()
1100 } else {
1101 self.config.prune_backoff()
1102 };
1103
1104 self.backoffs.update_backoff(topic_hash, peer, backoff);
1106
1107 Prune {
1108 topic_hash: topic_hash.clone(),
1109 peers,
1110 backoff: Some(backoff.as_secs()),
1111 }
1112 }
1113
1114 fn leave(&mut self, topic_hash: &TopicHash) {
1116 tracing::debug!(topic=%topic_hash, "Running LEAVE for topic");
1117
1118 if let Some((_, peers)) = self.mesh.remove_entry(topic_hash) {
1120 if let Some(m) = self.metrics.as_mut() {
1121 m.left(topic_hash)
1122 }
1123 for peer_id in peers {
1124 tracing::debug!(%peer_id, "LEAVE: Sending PRUNE to peer");
1126
1127 let on_unsubscribe = true;
1128 let prune =
1129 self.make_prune(topic_hash, &peer_id, self.config.do_px(), on_unsubscribe);
1130 self.send_message(peer_id, RpcOut::Prune(prune));
1131
1132 peer_removed_from_mesh(
1134 peer_id,
1135 topic_hash,
1136 &self.mesh,
1137 &mut self.events,
1138 &self.connected_peers,
1139 );
1140 }
1141 }
1142 tracing::debug!(topic=%topic_hash, "Completed LEAVE for topic");
1143 }
1144
1145 fn check_explicit_peer_connection(&mut self, peer_id: &PeerId) {
1147 if !self.connected_peers.contains_key(peer_id) {
1148 tracing::debug!(peer=%peer_id, "Connecting to explicit peer");
1150 self.events.push_back(ToSwarm::Dial {
1151 opts: DialOpts::peer_id(*peer_id).build(),
1152 });
1153 }
1154 }
1155
1156 fn score_below_threshold(
1159 &self,
1160 peer_id: &PeerId,
1161 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1162 ) -> (bool, f64) {
1163 Self::score_below_threshold_from_scores(&self.peer_score, peer_id, threshold)
1164 }
1165
1166 fn score_below_threshold_from_scores(
1167 peer_score: &Option<(PeerScore, PeerScoreThresholds, Delay, GossipPromises)>,
1168 peer_id: &PeerId,
1169 threshold: impl Fn(&PeerScoreThresholds) -> f64,
1170 ) -> (bool, f64) {
1171 if let Some((peer_score, thresholds, ..)) = peer_score {
1172 let score = peer_score.score(peer_id);
1173 if score < threshold(thresholds) {
1174 return (true, score);
1175 }
1176 (false, score)
1177 } else {
1178 (false, 0.0)
1179 }
1180 }
1181
1182 fn handle_ihave(&mut self, peer_id: &PeerId, ihave_msgs: Vec<(TopicHash, Vec<MessageId>)>) {
1185 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1187 tracing::debug!(
1188 peer=%peer_id,
1189 %score,
1190 "IHAVE: ignoring peer with score below threshold"
1191 );
1192 return;
1193 }
1194
1195 let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0);
1197 *peer_have += 1;
1198 if *peer_have > self.config.max_ihave_messages() {
1199 tracing::debug!(
1200 peer=%peer_id,
1201 "IHAVE: peer has advertised too many times ({}) within this heartbeat \
1202 interval; ignoring",
1203 *peer_have
1204 );
1205 return;
1206 }
1207
1208 if let Some(iasked) = self.count_sent_iwant.get(peer_id) {
1209 if *iasked >= self.config.max_ihave_length() {
1210 tracing::debug!(
1211 peer=%peer_id,
1212 "IHAVE: peer has already advertised too many messages ({}); ignoring",
1213 *iasked
1214 );
1215 return;
1216 }
1217 }
1218
1219 tracing::trace!(peer=%peer_id, "Handling IHAVE for peer");
1220
1221 let mut iwant_ids = HashSet::new();
1222
1223 let want_message = |id: &MessageId| {
1224 if self.duplicate_cache.contains(id) {
1225 return false;
1226 }
1227
1228 self.peer_score
1229 .as_ref()
1230 .map(|(_, _, _, promises)| !promises.contains(id))
1231 .unwrap_or(true)
1232 };
1233
1234 for (topic, ids) in ihave_msgs {
1235 if !self.mesh.contains_key(&topic) {
1237 tracing::debug!(
1238 %topic,
1239 "IHAVE: Ignoring IHAVE - Not subscribed to topic"
1240 );
1241 continue;
1242 }
1243
1244 for id in ids.into_iter().filter(want_message) {
1245 if iwant_ids.insert(id) {
1247 if let Some(metrics) = self.metrics.as_mut() {
1249 metrics.register_iwant(&topic);
1250 }
1251 }
1252 }
1253 }
1254
1255 if !iwant_ids.is_empty() {
1256 let iasked = self.count_sent_iwant.entry(*peer_id).or_insert(0);
1257 let mut iask = iwant_ids.len();
1258 if *iasked + iask > self.config.max_ihave_length() {
1259 iask = self.config.max_ihave_length().saturating_sub(*iasked);
1260 }
1261
1262 tracing::debug!(
1264 peer=%peer_id,
1265 "IHAVE: Asking for {} out of {} messages from peer",
1266 iask,
1267 iwant_ids.len()
1268 );
1269
1270 let mut iwant_ids_vec: Vec<_> = iwant_ids.into_iter().collect();
1272 let mut rng = thread_rng();
1273 iwant_ids_vec.partial_shuffle(&mut rng, iask);
1274
1275 iwant_ids_vec.truncate(iask);
1276 *iasked += iask;
1277
1278 if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
1279 gossip_promises.add_promise(
1280 *peer_id,
1281 &iwant_ids_vec,
1282 Instant::now() + self.config.iwant_followup_time(),
1283 );
1284 }
1285 tracing::trace!(
1286 peer=%peer_id,
1287 "IHAVE: Asking for the following messages from peer: {:?}",
1288 iwant_ids_vec
1289 );
1290
1291 self.send_message(
1292 *peer_id,
1293 RpcOut::IWant(IWant {
1294 message_ids: iwant_ids_vec,
1295 }),
1296 );
1297 }
1298 tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
1299 }
1300
1301 fn handle_iwant(&mut self, peer_id: &PeerId, iwant_msgs: Vec<MessageId>) {
1304 if let (true, score) = self.score_below_threshold(peer_id, |pst| pst.gossip_threshold) {
1306 tracing::debug!(
1307 peer=%peer_id,
1308 "IWANT: ignoring peer with score below threshold [score = {}]",
1309 score
1310 );
1311 return;
1312 }
1313
1314 tracing::debug!(peer=%peer_id, "Handling IWANT for peer");
1315
1316 for id in iwant_msgs {
1317 if let Some((msg, count)) = self
1320 .mcache
1321 .get_with_iwant_counts(&id, peer_id)
1322 .map(|(msg, count)| (msg.clone(), count))
1323 {
1324 if count > self.config.gossip_retransimission() {
1325 tracing::debug!(
1326 peer=%peer_id,
1327 message=%id,
1328 "IWANT: Peer has asked for message too many times; ignoring request"
1329 );
1330 } else {
1331 tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1332 self.send_message(
1333 *peer_id,
1334 RpcOut::Forward {
1335 message: msg,
1336 timeout: Delay::new(self.config.forward_queue_duration()),
1337 },
1338 );
1339 }
1340 }
1341 }
1342 tracing::debug!(peer=%peer_id, "Completed IWANT handling for peer");
1343 }
1344
1345 fn handle_graft(&mut self, peer_id: &PeerId, topics: Vec<TopicHash>) {
1348 tracing::debug!(peer=%peer_id, "Handling GRAFT message for peer");
1349
1350 let mut to_prune_topics = HashSet::new();
1351
1352 let mut do_px = self.config.do_px();
1353
1354 let Some(connected_peer) = self.connected_peers.get_mut(peer_id) else {
1355 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
1356 return;
1357 };
1358
1359 for topic in &topics {
1362 if connected_peer.topics.insert(topic.clone()) {
1363 if let Some(m) = self.metrics.as_mut() {
1364 m.inc_topic_peers(topic);
1365 }
1366 }
1367 }
1368
1369 if self.explicit_peers.contains(peer_id) {
1371 tracing::warn!(peer=%peer_id, "GRAFT: ignoring request from direct peer");
1372 to_prune_topics = topics.into_iter().collect();
1374 do_px = false
1376 } else {
1377 let (below_zero, score) = self.score_below_threshold(peer_id, |_| 0.0);
1378 let now = Instant::now();
1379 for topic_hash in topics {
1380 if let Some(peers) = self.mesh.get_mut(&topic_hash) {
1381 if peers.contains(peer_id) {
1383 tracing::debug!(
1384 peer=%peer_id,
1385 topic=%&topic_hash,
1386 "GRAFT: Received graft for peer that is already in topic"
1387 );
1388 continue;
1389 }
1390
1391 if let Some(backoff_time) = self.backoffs.get_backoff_time(&topic_hash, peer_id)
1393 {
1394 if backoff_time > now {
1395 tracing::warn!(
1396 peer=%peer_id,
1397 "[Penalty] Peer attempted graft within backoff time, penalizing"
1398 );
1399 if let Some((peer_score, ..)) = &mut self.peer_score {
1401 if let Some(metrics) = self.metrics.as_mut() {
1402 metrics.register_score_penalty(Penalty::GraftBackoff);
1403 }
1404 peer_score.add_penalty(peer_id, 1);
1405
1406 #[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
1409 let flood_cutoff = (backoff_time
1410 + self.config.graft_flood_threshold())
1411 - self.config.prune_backoff();
1412 if flood_cutoff > now {
1413 peer_score.add_penalty(peer_id, 1);
1415 }
1416 }
1417 do_px = false;
1419
1420 to_prune_topics.insert(topic_hash.clone());
1421 continue;
1422 }
1423 }
1424
1425 if below_zero {
1427 tracing::debug!(
1429 peer=%peer_id,
1430 %score,
1431 topic=%topic_hash,
1432 "GRAFT: ignoring peer with negative score"
1433 );
1434 to_prune_topics.insert(topic_hash.clone());
1437 do_px = false;
1439 continue;
1440 }
1441
1442 if peers.len() >= self.config.mesh_n_high()
1445 && !self.outbound_peers.contains(peer_id)
1446 {
1447 to_prune_topics.insert(topic_hash.clone());
1448 continue;
1449 }
1450
1451 tracing::debug!(
1453 peer=%peer_id,
1454 topic=%topic_hash,
1455 "GRAFT: Mesh link added for peer in topic"
1456 );
1457
1458 if peers.insert(*peer_id) {
1459 if let Some(m) = self.metrics.as_mut() {
1460 m.peers_included(&topic_hash, Inclusion::Subscribed, 1)
1461 }
1462 }
1463
1464 peer_added_to_mesh(
1466 *peer_id,
1467 vec![&topic_hash],
1468 &self.mesh,
1469 &mut self.events,
1470 &self.connected_peers,
1471 );
1472
1473 if let Some((peer_score, ..)) = &mut self.peer_score {
1474 peer_score.graft(peer_id, topic_hash);
1475 }
1476 } else {
1477 do_px = false;
1479 tracing::debug!(
1480 peer=%peer_id,
1481 topic=%topic_hash,
1482 "GRAFT: Received graft for unknown topic from peer"
1483 );
1484 continue;
1486 }
1487 }
1488 }
1489
1490 if !to_prune_topics.is_empty() {
1491 let on_unsubscribe = false;
1493
1494 for prune in to_prune_topics
1495 .iter()
1496 .map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
1497 .collect::<Vec<_>>()
1498 {
1499 self.send_message(*peer_id, RpcOut::Prune(prune));
1500 }
1501 tracing::debug!(
1503 peer=%peer_id,
1504 "GRAFT: Not subscribed to topics - Sending PRUNE to peer"
1505 );
1506 }
1507 tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
1508 }
1509
1510 fn remove_peer_from_mesh(
1511 &mut self,
1512 peer_id: &PeerId,
1513 topic_hash: &TopicHash,
1514 backoff: Option<u64>,
1515 always_update_backoff: bool,
1516 reason: Churn,
1517 ) {
1518 let mut update_backoff = always_update_backoff;
1519 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1520 if peers.remove(peer_id) {
1522 tracing::debug!(
1523 peer=%peer_id,
1524 topic=%topic_hash,
1525 "PRUNE: Removing peer from the mesh for topic"
1526 );
1527 if let Some(m) = self.metrics.as_mut() {
1528 m.peers_removed(topic_hash, reason, 1)
1529 }
1530
1531 if let Some((peer_score, ..)) = &mut self.peer_score {
1532 peer_score.prune(peer_id, topic_hash.clone());
1533 }
1534
1535 update_backoff = true;
1536
1537 peer_removed_from_mesh(
1539 *peer_id,
1540 topic_hash,
1541 &self.mesh,
1542 &mut self.events,
1543 &self.connected_peers,
1544 );
1545 }
1546 }
1547 if update_backoff {
1548 let time = if let Some(backoff) = backoff {
1549 Duration::from_secs(backoff)
1550 } else {
1551 self.config.prune_backoff()
1552 };
1553 self.backoffs.update_backoff(topic_hash, peer_id, time);
1555 }
1556 }
1557
1558 fn handle_prune(
1560 &mut self,
1561 peer_id: &PeerId,
1562 prune_data: Vec<(TopicHash, Vec<PeerInfo>, Option<u64>)>,
1563 ) {
1564 tracing::debug!(peer=%peer_id, "Handling PRUNE message for peer");
1565 let (below_threshold, score) =
1566 self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold);
1567 for (topic_hash, px, backoff) in prune_data {
1568 self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true, Churn::Prune);
1569
1570 if self.mesh.contains_key(&topic_hash) {
1571 if !px.is_empty() {
1573 if below_threshold {
1575 tracing::debug!(
1576 peer=%peer_id,
1577 %score,
1578 topic=%topic_hash,
1579 "PRUNE: ignoring PX from peer with insufficient score"
1580 );
1581 continue;
1582 }
1583
1584 if self.config.prune_peers() > 0 {
1591 self.px_connect(px);
1592 }
1593 }
1594 }
1595 }
1596 tracing::debug!(peer=%peer_id, "Completed PRUNE handling for peer");
1597 }
1598
1599 fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
1600 let n = self.config.prune_peers();
1601 px.retain(|p| p.peer_id.is_some());
1606 if px.len() > n {
1607 let mut rng = thread_rng();
1609 px.partial_shuffle(&mut rng, n);
1610 px = px.into_iter().take(n).collect();
1611 }
1612
1613 for p in px {
1614 if let Some(peer_id) = p.peer_id {
1617 self.px_peers.insert(peer_id);
1619
1620 self.events.push_back(ToSwarm::Dial {
1622 opts: DialOpts::peer_id(peer_id).build(),
1623 });
1624 }
1625 }
1626 }
1627
1628 fn message_is_valid(
1631 &mut self,
1632 msg_id: &MessageId,
1633 raw_message: &mut RawMessage,
1634 propagation_source: &PeerId,
1635 ) -> bool {
1636 tracing::debug!(
1637 peer=%propagation_source,
1638 message=%msg_id,
1639 "Handling message from peer"
1640 );
1641
1642 if self.blacklisted_peers.contains(propagation_source) {
1644 tracing::debug!(
1645 peer=%propagation_source,
1646 "Rejecting message from blacklisted peer"
1647 );
1648 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1649 peer_score.reject_message(
1650 propagation_source,
1651 msg_id,
1652 &raw_message.topic,
1653 RejectReason::BlackListedPeer,
1654 );
1655 gossip_promises.reject_message(msg_id, &RejectReason::BlackListedPeer);
1656 }
1657 return false;
1658 }
1659
1660 if let Some(source) = raw_message.source.as_ref() {
1662 if self.blacklisted_peers.contains(source) {
1663 tracing::debug!(
1664 peer=%propagation_source,
1665 %source,
1666 "Rejecting message from peer because of blacklisted source"
1667 );
1668 self.handle_invalid_message(
1669 propagation_source,
1670 raw_message,
1671 RejectReason::BlackListedSource,
1672 );
1673 return false;
1674 }
1675 }
1676
1677 if !self.config.validate_messages() {
1681 raw_message.validated = true;
1682 }
1683
1684 let self_published = !self.config.allow_self_origin()
1686 && if let Some(own_id) = self.publish_config.get_own_id() {
1687 own_id != propagation_source
1688 && raw_message.source.as_ref().is_some_and(|s| s == own_id)
1689 } else {
1690 self.published_message_ids.contains(msg_id)
1691 };
1692
1693 if self_published {
1694 tracing::debug!(
1695 message=%msg_id,
1696 source=%propagation_source,
1697 "Dropping message claiming to be from self but forwarded from source"
1698 );
1699 self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin);
1700 return false;
1701 }
1702
1703 true
1704 }
1705
1706 fn handle_received_message(
1710 &mut self,
1711 mut raw_message: RawMessage,
1712 propagation_source: &PeerId,
1713 ) {
1714 if let Some(metrics) = self.metrics.as_mut() {
1716 metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len());
1717 }
1718
1719 let message = match self.data_transform.inbound_transform(raw_message.clone()) {
1721 Ok(message) => message,
1722 Err(e) => {
1723 tracing::debug!("Invalid message. Transform error: {:?}", e);
1724 self.handle_invalid_message(
1726 propagation_source,
1727 &raw_message,
1728 RejectReason::ValidationError(ValidationError::TransformFailed),
1729 );
1730 return;
1731 }
1732 };
1733
1734 let msg_id = self.config.message_id(&message);
1736
1737 if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) {
1741 return;
1742 }
1743
1744 if !self.duplicate_cache.insert(msg_id.clone()) {
1745 tracing::debug!(message=%msg_id, "Message already received, ignoring");
1746 if let Some((peer_score, ..)) = &mut self.peer_score {
1747 peer_score.duplicated_message(propagation_source, &msg_id, &message.topic);
1748 }
1749 self.mcache.observe_duplicate(&msg_id, propagation_source);
1750 return;
1751 }
1752 tracing::debug!(
1753 message=%msg_id,
1754 "Put message in duplicate_cache and resolve promises"
1755 );
1756
1757 if let Some(metrics) = self.metrics.as_mut() {
1759 metrics.msg_recvd(&message.topic);
1760 }
1761
1762 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1765 peer_score.validate_message(propagation_source, &msg_id, &message.topic);
1766 gossip_promises.message_delivered(&msg_id);
1767 }
1768
1769 self.mcache.put(&msg_id, raw_message.clone());
1771
1772 if self.mesh.contains_key(&message.topic) {
1774 tracing::debug!("Sending received message to user");
1775 self.events
1776 .push_back(ToSwarm::GenerateEvent(Event::Message {
1777 propagation_source: *propagation_source,
1778 message_id: msg_id.clone(),
1779 message,
1780 }));
1781 } else {
1782 tracing::debug!(
1783 topic=%message.topic,
1784 "Received message on a topic we are not subscribed to"
1785 );
1786 return;
1787 }
1788
1789 if !self.config.validate_messages() {
1791 self.forward_msg(
1792 &msg_id,
1793 raw_message,
1794 Some(propagation_source),
1795 HashSet::new(),
1796 );
1797 tracing::debug!(message=%msg_id, "Completed message handling for message");
1798 }
1799 }
1800
1801 fn handle_invalid_message(
1803 &mut self,
1804 propagation_source: &PeerId,
1805 raw_message: &RawMessage,
1806 reject_reason: RejectReason,
1807 ) {
1808 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
1809 if let Some(metrics) = self.metrics.as_mut() {
1810 metrics.register_invalid_message(&raw_message.topic);
1811 }
1812
1813 if let Ok(message) = self.data_transform.inbound_transform(raw_message.clone()) {
1814 let message_id = self.config.message_id(&message);
1815
1816 peer_score.reject_message(
1817 propagation_source,
1818 &message_id,
1819 &message.topic,
1820 reject_reason,
1821 );
1822
1823 gossip_promises.reject_message(&message_id, &reject_reason);
1824 } else {
1825 peer_score.reject_invalid_message(propagation_source, &raw_message.topic);
1829 }
1830 }
1831 }
1832
1833 fn handle_received_subscriptions(
1835 &mut self,
1836 subscriptions: &[Subscription],
1837 propagation_source: &PeerId,
1838 ) {
1839 tracing::debug!(
1840 source=%propagation_source,
1841 "Handling subscriptions: {:?}",
1842 subscriptions,
1843 );
1844
1845 let mut unsubscribed_peers = Vec::new();
1846
1847 let Some(peer) = self.connected_peers.get_mut(propagation_source) else {
1848 tracing::error!(
1849 peer=%propagation_source,
1850 "Subscription by unknown peer"
1851 );
1852 return;
1853 };
1854
1855 let mut topics_to_graft = Vec::new();
1857
1858 let mut application_event = Vec::new();
1860
1861 let filtered_topics = match self
1862 .subscription_filter
1863 .filter_incoming_subscriptions(subscriptions, &peer.topics)
1864 {
1865 Ok(topics) => topics,
1866 Err(s) => {
1867 tracing::error!(
1868 peer=%propagation_source,
1869 "Subscription filter error: {}; ignoring RPC from peer",
1870 s
1871 );
1872 return;
1873 }
1874 };
1875
1876 for subscription in filtered_topics {
1877 let topic_hash = &subscription.topic_hash;
1879
1880 match subscription.action {
1881 SubscriptionAction::Subscribe => {
1882 if peer.topics.insert(topic_hash.clone()) {
1883 tracing::debug!(
1884 peer=%propagation_source,
1885 topic=%topic_hash,
1886 "SUBSCRIPTION: Adding gossip peer to topic"
1887 );
1888
1889 if let Some(m) = self.metrics.as_mut() {
1890 m.inc_topic_peers(topic_hash);
1891 }
1892 }
1893
1894 if !self.explicit_peers.contains(propagation_source)
1896 && matches!(peer.kind, PeerKind::Gossipsubv1_1 | PeerKind::Gossipsub)
1897 && !Self::score_below_threshold_from_scores(
1898 &self.peer_score,
1899 propagation_source,
1900 |_| 0.0,
1901 )
1902 .0
1903 && !self
1904 .backoffs
1905 .is_backoff_with_slack(topic_hash, propagation_source)
1906 {
1907 if let Some(peers) = self.mesh.get_mut(topic_hash) {
1908 if peers.len() < self.config.mesh_n_low()
1909 && peers.insert(*propagation_source)
1910 {
1911 tracing::debug!(
1912 peer=%propagation_source,
1913 topic=%topic_hash,
1914 "SUBSCRIPTION: Adding peer to the mesh for topic"
1915 );
1916 if let Some(m) = self.metrics.as_mut() {
1917 m.peers_included(topic_hash, Inclusion::Subscribed, 1)
1918 }
1919 tracing::debug!(
1921 peer=%propagation_source,
1922 topic=%topic_hash,
1923 "Sending GRAFT to peer for topic"
1924 );
1925 if let Some((peer_score, ..)) = &mut self.peer_score {
1926 peer_score.graft(propagation_source, topic_hash.clone());
1927 }
1928 topics_to_graft.push(topic_hash.clone());
1929 }
1930 }
1931 }
1932 application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
1934 peer_id: *propagation_source,
1935 topic: topic_hash.clone(),
1936 }));
1937 }
1938 SubscriptionAction::Unsubscribe => {
1939 if peer.topics.remove(topic_hash) {
1940 tracing::debug!(
1941 peer=%propagation_source,
1942 topic=%topic_hash,
1943 "SUBSCRIPTION: Removing gossip peer from topic"
1944 );
1945
1946 if let Some(m) = self.metrics.as_mut() {
1947 m.dec_topic_peers(topic_hash);
1948 }
1949 }
1950
1951 unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
1952 application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
1954 peer_id: *propagation_source,
1955 topic: topic_hash.clone(),
1956 }));
1957 }
1958 }
1959 }
1960
1961 for (peer_id, topic_hash) in unsubscribed_peers {
1963 self.fanout
1964 .get_mut(&topic_hash)
1965 .map(|peers| peers.remove(&peer_id));
1966 self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false, Churn::Unsub);
1967 }
1968
1969 let topics_joined = topics_to_graft.iter().collect::<Vec<_>>();
1971 if !topics_joined.is_empty() {
1972 peer_added_to_mesh(
1973 *propagation_source,
1974 topics_joined,
1975 &self.mesh,
1976 &mut self.events,
1977 &self.connected_peers,
1978 );
1979 }
1980
1981 for topic_hash in topics_to_graft.into_iter() {
1984 self.send_message(*propagation_source, RpcOut::Graft(Graft { topic_hash }));
1985 }
1986
1987 for event in application_event {
1989 self.events.push_back(event);
1990 }
1991
1992 tracing::trace!(
1993 source=%propagation_source,
1994 "Completed handling subscriptions from source"
1995 );
1996 }
1997
1998 fn apply_iwant_penalties(&mut self) {
2000 if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
2001 for (peer, count) in gossip_promises.get_broken_promises() {
2002 peer_score.add_penalty(&peer, count);
2003 if let Some(metrics) = self.metrics.as_mut() {
2004 metrics.register_score_penalty(Penalty::BrokenPromise);
2005 }
2006 }
2007 }
2008 }
2009
2010 fn heartbeat(&mut self) {
2012 tracing::debug!("Starting heartbeat");
2013 let start = Instant::now();
2014
2015 if let Some(m) = &mut self.metrics {
2019 for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
2020 m.observe_priority_queue_size(sender_queue.priority_queue_len());
2021 m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
2022 }
2023 }
2024
2025 self.heartbeat_ticks += 1;
2026
2027 let mut to_graft = HashMap::new();
2028 let mut to_prune = HashMap::new();
2029 let mut no_px = HashSet::new();
2030
2031 self.backoffs.heartbeat();
2033
2034 self.count_sent_iwant.clear();
2036 self.count_received_ihave.clear();
2037
2038 self.apply_iwant_penalties();
2040
2041 if self.heartbeat_ticks % self.config.check_explicit_peers_ticks() == 0 {
2043 for p in self.explicit_peers.clone() {
2044 self.check_explicit_peer_connection(&p);
2045 }
2046 }
2047
2048 let mut scores = HashMap::with_capacity(self.connected_peers.len());
2050 if let Some((peer_score, ..)) = &self.peer_score {
2051 for peer_id in self.connected_peers.keys() {
2052 scores
2053 .entry(peer_id)
2054 .or_insert_with(|| peer_score.metric_score(peer_id, self.metrics.as_mut()));
2055 }
2056 }
2057
2058 for (topic_hash, peers) in self.mesh.iter_mut() {
2060 let explicit_peers = &self.explicit_peers;
2061 let backoffs = &self.backoffs;
2062 let outbound_peers = &self.outbound_peers;
2063
2064 let mut to_remove_peers = Vec::new();
2068 for peer_id in peers.iter() {
2069 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2070
2071 if let Some(metrics) = self.metrics.as_mut() {
2073 metrics.observe_mesh_peers_score(topic_hash, peer_score);
2074 }
2075
2076 if peer_score < 0.0 {
2077 tracing::debug!(
2078 peer=%peer_id,
2079 score=%peer_score,
2080 topic=%topic_hash,
2081 "HEARTBEAT: Prune peer with negative score"
2082 );
2083
2084 let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new);
2085 current_topic.push(topic_hash.clone());
2086 no_px.insert(*peer_id);
2087 to_remove_peers.push(*peer_id);
2088 }
2089 }
2090
2091 if let Some(m) = self.metrics.as_mut() {
2092 m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len())
2093 }
2094
2095 for peer_id in to_remove_peers {
2096 peers.remove(&peer_id);
2097 }
2098
2099 if peers.len() < self.config.mesh_n_low() {
2101 tracing::debug!(
2102 topic=%topic_hash,
2103 "HEARTBEAT: Mesh low. Topic contains: {} needs: {}",
2104 peers.len(),
2105 self.config.mesh_n_low()
2106 );
2107 let desired_peers = self.config.mesh_n() - peers.len();
2109 let peer_list =
2110 get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2111 !peers.contains(peer)
2112 && !explicit_peers.contains(peer)
2113 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2114 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2115 });
2116 for peer in &peer_list {
2117 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2118 current_topic.push(topic_hash.clone());
2119 }
2120 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2122 if let Some(m) = self.metrics.as_mut() {
2123 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2124 }
2125 peers.extend(peer_list);
2126 }
2127
2128 if peers.len() > self.config.mesh_n_high() {
2130 tracing::debug!(
2131 topic=%topic_hash,
2132 "HEARTBEAT: Mesh high. Topic contains: {} needs: {}",
2133 peers.len(),
2134 self.config.mesh_n_high()
2135 );
2136 let excess_peer_no = peers.len() - self.config.mesh_n();
2137
2138 let mut rng = thread_rng();
2140 let mut shuffled = peers.iter().copied().collect::<Vec<_>>();
2141 shuffled.shuffle(&mut rng);
2142 shuffled.sort_by(|p1, p2| {
2143 let score_p1 = *scores.get(p1).unwrap_or(&0.0);
2144 let score_p2 = *scores.get(p2).unwrap_or(&0.0);
2145
2146 score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal)
2147 });
2148 shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng);
2150
2151 let mut outbound = {
2153 let outbound_peers = &self.outbound_peers;
2154 shuffled
2155 .iter()
2156 .filter(|p| outbound_peers.contains(*p))
2157 .count()
2158 };
2159
2160 let mut removed = 0;
2163 for peer in shuffled {
2164 if removed == excess_peer_no {
2165 break;
2166 }
2167 if self.outbound_peers.contains(&peer) {
2168 if outbound <= self.config.mesh_outbound_min() {
2169 continue;
2171 }
2172 outbound -= 1;
2174 }
2175
2176 peers.remove(&peer);
2178 let current_topic = to_prune.entry(peer).or_insert_with(Vec::new);
2179 current_topic.push(topic_hash.clone());
2180 removed += 1;
2181 }
2182
2183 if let Some(m) = self.metrics.as_mut() {
2184 m.peers_removed(topic_hash, Churn::Excess, removed)
2185 }
2186 }
2187
2188 if peers.len() >= self.config.mesh_n_low() {
2190 let outbound = { peers.iter().filter(|p| outbound_peers.contains(*p)).count() };
2192
2193 if outbound < self.config.mesh_outbound_min() {
2195 let needed = self.config.mesh_outbound_min() - outbound;
2196 let peer_list =
2197 get_random_peers(&self.connected_peers, topic_hash, needed, |peer| {
2198 !peers.contains(peer)
2199 && !explicit_peers.contains(peer)
2200 && !backoffs.is_backoff_with_slack(topic_hash, peer)
2201 && *scores.get(peer).unwrap_or(&0.0) >= 0.0
2202 && outbound_peers.contains(peer)
2203 });
2204 for peer in &peer_list {
2205 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2206 current_topic.push(topic_hash.clone());
2207 }
2208 tracing::debug!("Updating mesh, new mesh: {:?}", peer_list);
2210 if let Some(m) = self.metrics.as_mut() {
2211 m.peers_included(topic_hash, Inclusion::Outbound, peer_list.len())
2212 }
2213 peers.extend(peer_list);
2214 }
2215 }
2216
2217 if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
2219 && peers.len() > 1
2220 && self.peer_score.is_some()
2221 {
2222 if let Some((_, thresholds, _, _)) = &self.peer_score {
2223 let mut peers_by_score: Vec<_> = peers.iter().collect();
2233 peers_by_score.sort_by(|p1, p2| {
2234 let p1_score = *scores.get(p1).unwrap_or(&0.0);
2235 let p2_score = *scores.get(p2).unwrap_or(&0.0);
2236 p1_score.partial_cmp(&p2_score).unwrap_or(Equal)
2237 });
2238
2239 let middle = peers_by_score.len() / 2;
2240 let median = if peers_by_score.len() % 2 == 0 {
2241 let sub_middle_peer = *peers_by_score
2242 .get(middle - 1)
2243 .expect("middle < vector length and middle > 0 since peers.len() > 0");
2244 let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0);
2245 let middle_peer =
2246 *peers_by_score.get(middle).expect("middle < vector length");
2247 let middle_score = *scores.get(middle_peer).unwrap_or(&0.0);
2248
2249 (sub_middle_score + middle_score) * 0.5
2250 } else {
2251 *scores
2252 .get(*peers_by_score.get(middle).expect("middle < vector length"))
2253 .unwrap_or(&0.0)
2254 };
2255
2256 if median < thresholds.opportunistic_graft_threshold {
2259 let peer_list = get_random_peers(
2260 &self.connected_peers,
2261 topic_hash,
2262 self.config.opportunistic_graft_peers(),
2263 |peer_id| {
2264 !peers.contains(peer_id)
2265 && !explicit_peers.contains(peer_id)
2266 && !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2267 && *scores.get(peer_id).unwrap_or(&0.0) > median
2268 },
2269 );
2270 for peer in &peer_list {
2271 let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
2272 current_topic.push(topic_hash.clone());
2273 }
2274 tracing::debug!(
2276 topic=%topic_hash,
2277 "Opportunistically graft in topic with peers {:?}",
2278 peer_list
2279 );
2280 if let Some(m) = self.metrics.as_mut() {
2281 m.peers_included(topic_hash, Inclusion::Random, peer_list.len())
2282 }
2283 peers.extend(peer_list);
2284 }
2285 }
2286 }
2287 if let Some(m) = self.metrics.as_mut() {
2289 m.set_mesh_peers(topic_hash, peers.len())
2290 }
2291 }
2292
2293 {
2295 let fanout = &mut self.fanout; let fanout_ttl = self.config.fanout_ttl();
2297 self.fanout_last_pub.retain(|topic_hash, last_pub_time| {
2298 if *last_pub_time + fanout_ttl < Instant::now() {
2299 tracing::debug!(
2300 topic=%topic_hash,
2301 "HEARTBEAT: Fanout topic removed due to timeout"
2302 );
2303 fanout.remove(topic_hash);
2304 return false;
2305 }
2306 true
2307 });
2308 }
2309
2310 for (topic_hash, peers) in self.fanout.iter_mut() {
2313 let mut to_remove_peers = Vec::new();
2314 let publish_threshold = match &self.peer_score {
2315 Some((_, thresholds, _, _)) => thresholds.publish_threshold,
2316 _ => 0.0,
2317 };
2318 for peer_id in peers.iter() {
2319 let peer_score = *scores.get(peer_id).unwrap_or(&0.0);
2321 match self.connected_peers.get(peer_id) {
2322 Some(peer) => {
2323 if !peer.topics.contains(topic_hash) || peer_score < publish_threshold {
2324 tracing::debug!(
2325 topic=%topic_hash,
2326 "HEARTBEAT: Peer removed from fanout for topic"
2327 );
2328 to_remove_peers.push(*peer_id);
2329 }
2330 }
2331 None => {
2332 to_remove_peers.push(*peer_id);
2334 }
2335 }
2336 }
2337 for to_remove in to_remove_peers {
2338 peers.remove(&to_remove);
2339 }
2340
2341 if peers.len() < self.config.mesh_n() {
2343 tracing::debug!(
2344 "HEARTBEAT: Fanout low. Contains: {:?} needs: {:?}",
2345 peers.len(),
2346 self.config.mesh_n()
2347 );
2348 let needed_peers = self.config.mesh_n() - peers.len();
2349 let explicit_peers = &self.explicit_peers;
2350 let new_peers =
2351 get_random_peers(&self.connected_peers, topic_hash, needed_peers, |peer_id| {
2352 !peers.contains(peer_id)
2353 && !explicit_peers.contains(peer_id)
2354 && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold
2355 });
2356 peers.extend(new_peers);
2357 }
2358 }
2359
2360 if self.peer_score.is_some() {
2361 tracing::trace!("Mesh message deliveries: {:?}", {
2362 self.mesh
2363 .iter()
2364 .map(|(t, peers)| {
2365 (
2366 t.clone(),
2367 peers
2368 .iter()
2369 .map(|p| {
2370 (
2371 *p,
2372 self.peer_score
2373 .as_ref()
2374 .expect("peer_score.is_some()")
2375 .0
2376 .mesh_message_deliveries(p, t)
2377 .unwrap_or(0.0),
2378 )
2379 })
2380 .collect::<HashMap<PeerId, f64>>(),
2381 )
2382 })
2383 .collect::<HashMap<TopicHash, HashMap<PeerId, f64>>>()
2384 })
2385 }
2386
2387 self.emit_gossip();
2388
2389 if !to_graft.is_empty() | !to_prune.is_empty() {
2391 self.send_graft_prune(to_graft, to_prune, no_px);
2392 }
2393
2394 self.mcache.shift();
2396
2397 for (peer_id, failed_messages) in self.failed_messages.drain() {
2399 tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
2400 self.events
2401 .push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
2402 peer_id,
2403 failed_messages,
2404 }));
2405 }
2406 self.failed_messages.shrink_to_fit();
2407
2408 tracing::debug!("Completed Heartbeat");
2409 if let Some(metrics) = self.metrics.as_mut() {
2410 let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
2411 metrics.observe_heartbeat_duration(duration);
2412 }
2413 }
2414
2415 fn emit_gossip(&mut self) {
2418 let mut rng = thread_rng();
2419 let mut messages = Vec::new();
2420 for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) {
2421 let mut message_ids = self.mcache.get_gossip_message_ids(topic_hash);
2422 if message_ids.is_empty() {
2423 continue;
2424 }
2425
2426 if message_ids.len() > self.config.max_ihave_length() {
2428 tracing::debug!(
2430 "too many messages for gossip; will truncate IHAVE list ({} messages)",
2431 message_ids.len()
2432 );
2433 } else {
2434 message_ids.shuffle(&mut rng);
2436 }
2437
2438 let n_map = |m| {
2440 max(
2441 self.config.gossip_lazy(),
2442 (self.config.gossip_factor() * m as f64) as usize,
2443 )
2444 };
2445 let to_msg_peers =
2447 get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2448 !peers.contains(peer)
2449 && !self.explicit_peers.contains(peer)
2450 && !self.score_below_threshold(peer, |ts| ts.gossip_threshold).0
2451 });
2452
2453 tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
2454
2455 for peer_id in to_msg_peers {
2456 let mut peer_message_ids = message_ids.clone();
2457
2458 if peer_message_ids.len() > self.config.max_ihave_length() {
2459 peer_message_ids.partial_shuffle(&mut rng, self.config.max_ihave_length());
2463 peer_message_ids.truncate(self.config.max_ihave_length());
2464 }
2465
2466 messages.push((
2468 peer_id,
2469 RpcOut::IHave(IHave {
2470 topic_hash: topic_hash.clone(),
2471 message_ids: peer_message_ids,
2472 }),
2473 ));
2474 }
2475 }
2476 for (peer_id, message) in messages {
2477 self.send_message(peer_id, message);
2478 }
2479 }
2480
2481 fn send_graft_prune(
2484 &mut self,
2485 to_graft: HashMap<PeerId, Vec<TopicHash>>,
2486 mut to_prune: HashMap<PeerId, Vec<TopicHash>>,
2487 no_px: HashSet<PeerId>,
2488 ) {
2489 for (peer_id, topics) in to_graft.into_iter() {
2491 for topic in &topics {
2492 if let Some((peer_score, ..)) = &mut self.peer_score {
2494 peer_score.graft(&peer_id, topic.clone());
2495 }
2496
2497 peer_added_to_mesh(
2500 peer_id,
2501 vec![topic],
2502 &self.mesh,
2503 &mut self.events,
2504 &self.connected_peers,
2505 );
2506 }
2507 let rpc_msgs = topics.iter().map(|topic_hash| {
2508 RpcOut::Graft(Graft {
2509 topic_hash: topic_hash.clone(),
2510 })
2511 });
2512
2513 let prune_msgs = to_prune
2520 .remove(&peer_id)
2521 .into_iter()
2522 .flatten()
2523 .map(|topic_hash| {
2524 let prune = self.make_prune(
2525 &topic_hash,
2526 &peer_id,
2527 self.config.do_px() && !no_px.contains(&peer_id),
2528 false,
2529 );
2530 RpcOut::Prune(prune)
2531 });
2532
2533 for msg in rpc_msgs.chain(prune_msgs).collect::<Vec<_>>() {
2535 self.send_message(peer_id, msg);
2536 }
2537 }
2538
2539 for (peer_id, topics) in to_prune.iter() {
2542 for topic_hash in topics {
2543 let prune = self.make_prune(
2544 topic_hash,
2545 peer_id,
2546 self.config.do_px() && !no_px.contains(peer_id),
2547 false,
2548 );
2549 self.send_message(*peer_id, RpcOut::Prune(prune));
2550
2551 peer_removed_from_mesh(
2553 *peer_id,
2554 topic_hash,
2555 &self.mesh,
2556 &mut self.events,
2557 &self.connected_peers,
2558 );
2559 }
2560 }
2561 }
2562
2563 fn forward_msg(
2567 &mut self,
2568 msg_id: &MessageId,
2569 message: RawMessage,
2570 propagation_source: Option<&PeerId>,
2571 originating_peers: HashSet<PeerId>,
2572 ) -> bool {
2573 if let Some((peer_score, ..)) = &mut self.peer_score {
2575 if let Some(peer) = propagation_source {
2576 peer_score.deliver_message(peer, msg_id, &message.topic);
2577 }
2578 }
2579
2580 tracing::debug!(message=%msg_id, "Forwarding message");
2581 let mut recipient_peers = HashSet::new();
2582
2583 for peer_id in &self.explicit_peers {
2587 let Some(peer) = self.connected_peers.get(peer_id) else {
2588 continue;
2589 };
2590 if Some(peer_id) != propagation_source
2591 && !originating_peers.contains(peer_id)
2592 && Some(peer_id) != message.source.as_ref()
2593 && peer.topics.contains(&message.topic)
2594 {
2595 recipient_peers.insert(*peer_id);
2596 }
2597 }
2598
2599 let topic = &message.topic;
2601 if let Some(mesh_peers) = self.mesh.get(topic) {
2603 for peer_id in mesh_peers {
2604 if Some(peer_id) != propagation_source
2605 && !originating_peers.contains(peer_id)
2606 && Some(peer_id) != message.source.as_ref()
2607 {
2608 recipient_peers.insert(*peer_id);
2609 }
2610 }
2611 }
2612
2613 if recipient_peers.is_empty() {
2614 return false;
2615 }
2616
2617 for peer in recipient_peers.iter() {
2619 let event = RpcOut::Forward {
2620 message: message.clone(),
2621 timeout: Delay::new(self.config.forward_queue_duration()),
2622 };
2623 tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
2624 self.send_message(*peer, event);
2625 }
2626 tracing::debug!("Completed forwarding message");
2627 true
2628 }
2629
2630 pub(crate) fn build_raw_message(
2632 &mut self,
2633 topic: TopicHash,
2634 data: Vec<u8>,
2635 ) -> Result<RawMessage, PublishError> {
2636 match &mut self.publish_config {
2637 PublishConfig::Signing {
2638 ref keypair,
2639 author,
2640 inline_key,
2641 last_seq_no,
2642 } => {
2643 let sequence_number = last_seq_no.next();
2644
2645 let signature = {
2646 let message = proto::Message {
2647 from: Some(author.to_bytes()),
2648 data: Some(data.clone()),
2649 seqno: Some(sequence_number.to_be_bytes().to_vec()),
2650 topic: topic.clone().into_string(),
2651 signature: None,
2652 key: None,
2653 };
2654
2655 let mut buf = Vec::with_capacity(message.get_size());
2656 let mut writer = Writer::new(&mut buf);
2657
2658 message
2659 .write_message(&mut writer)
2660 .expect("Encoding to succeed");
2661
2662 let mut signature_bytes = SIGNING_PREFIX.to_vec();
2664 signature_bytes.extend_from_slice(&buf);
2665 Some(keypair.sign(&signature_bytes)?)
2666 };
2667
2668 Ok(RawMessage {
2669 source: Some(*author),
2670 data,
2671 sequence_number: Some(sequence_number),
2674 topic,
2675 signature,
2676 key: inline_key.clone(),
2677 validated: true, })
2679 }
2680 PublishConfig::Author(peer_id) => {
2681 Ok(RawMessage {
2682 source: Some(*peer_id),
2683 data,
2684 sequence_number: Some(rand::random()),
2687 topic,
2688 signature: None,
2689 key: None,
2690 validated: true, })
2692 }
2693 PublishConfig::RandomAuthor => {
2694 Ok(RawMessage {
2695 source: Some(PeerId::random()),
2696 data,
2697 sequence_number: Some(rand::random()),
2700 topic,
2701 signature: None,
2702 key: None,
2703 validated: true, })
2705 }
2706 PublishConfig::Anonymous => {
2707 Ok(RawMessage {
2708 source: None,
2709 data,
2710 sequence_number: None,
2713 topic,
2714 signature: None,
2715 key: None,
2716 validated: true, })
2718 }
2719 }
2720 }
2721
2722 fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> bool {
2729 if let Some(m) = self.metrics.as_mut() {
2730 if let RpcOut::Publish { ref message, .. } | RpcOut::Forward { ref message, .. } = rpc {
2731 m.msg_sent(&message.topic, message.raw_protobuf_len());
2733 }
2734 }
2735
2736 let Some(peer) = &mut self.connected_peers.get_mut(&peer_id) else {
2737 tracing::error!(peer = %peer_id,
2738 "Could not send rpc to connection handler, peer doesn't exist in connected peer list");
2739 return false;
2740 };
2741
2742 match peer.sender.send_message(rpc) {
2744 Ok(()) => true,
2745 Err(rpc) => {
2746 tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);
2748
2749 let failed_messages = self.failed_messages.entry(peer_id).or_default();
2751 match rpc {
2752 RpcOut::Publish { .. } => {
2753 failed_messages.priority += 1;
2754 failed_messages.publish += 1;
2755 }
2756 RpcOut::Forward { .. } => {
2757 failed_messages.non_priority += 1;
2758 failed_messages.forward += 1;
2759 }
2760 RpcOut::IWant(_) | RpcOut::IHave(_) => {
2761 failed_messages.non_priority += 1;
2762 }
2763 RpcOut::Graft(_)
2764 | RpcOut::Prune(_)
2765 | RpcOut::Subscribe(_)
2766 | RpcOut::Unsubscribe(_) => {
2767 unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
2768 }
2769 }
2770
2771 if let Some((peer_score, ..)) = &mut self.peer_score {
2773 peer_score.failed_message_slow_peer(&peer_id);
2774 }
2775
2776 false
2777 }
2778 }
2779 }
2780
2781 fn on_connection_established(
2782 &mut self,
2783 ConnectionEstablished {
2784 peer_id,
2785 endpoint,
2786 other_established,
2787 ..
2788 }: ConnectionEstablished,
2789 ) {
2790 if endpoint.is_dialer() && other_established == 0 && !self.px_peers.contains(&peer_id) {
2794 self.outbound_peers.insert(peer_id);
2797 }
2798
2799 if let Some((peer_score, ..)) = &mut self.peer_score {
2801 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2802 peer_score.add_ip(&peer_id, ip);
2803 } else {
2804 tracing::trace!(
2805 peer=%peer_id,
2806 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2807 endpoint
2808 )
2809 }
2810 }
2811
2812 if other_established > 0 {
2813 return; }
2815
2816 if let Some((peer_score, ..)) = &mut self.peer_score {
2817 peer_score.add_peer(peer_id);
2818 }
2819
2820 if self.blacklisted_peers.contains(&peer_id) {
2822 tracing::debug!(peer=%peer_id, "Ignoring connection from blacklisted peer");
2823 return;
2824 }
2825
2826 tracing::debug!(peer=%peer_id, "New peer connected");
2827 for topic_hash in self.mesh.clone().into_keys() {
2829 self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2830 }
2831 }
2832
2833 fn on_connection_closed(
2834 &mut self,
2835 ConnectionClosed {
2836 peer_id,
2837 connection_id,
2838 endpoint,
2839 remaining_established,
2840 ..
2841 }: ConnectionClosed,
2842 ) {
2843 if let Some((peer_score, ..)) = &mut self.peer_score {
2845 if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) {
2846 peer_score.remove_ip(&peer_id, &ip);
2847 } else {
2848 tracing::trace!(
2849 peer=%peer_id,
2850 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2851 endpoint
2852 )
2853 }
2854 }
2855
2856 if remaining_established != 0 {
2857 if let Some(peer) = self.connected_peers.get_mut(&peer_id) {
2859 let index = peer
2860 .connections
2861 .iter()
2862 .position(|v| v == &connection_id)
2863 .expect("Previously established connection to peer must be present");
2864 peer.connections.remove(index);
2865
2866 if !peer.connections.is_empty() {
2869 for topic in &peer.topics {
2870 if let Some(mesh_peers) = self.mesh.get(topic) {
2871 if mesh_peers.contains(&peer_id) {
2872 self.events.push_back(ToSwarm::NotifyHandler {
2873 peer_id,
2874 event: HandlerIn::JoinedMesh,
2875 handler: NotifyHandler::One(peer.connections[0]),
2876 });
2877 break;
2878 }
2879 }
2880 }
2881 }
2882 }
2883 } else {
2884 tracing::debug!(peer=%peer_id, "Peer disconnected");
2886 let Some(connected_peer) = self.connected_peers.get(&peer_id) else {
2887 tracing::error!(peer_id = %peer_id, "Peer non-existent when handling disconnection");
2888 return;
2889 };
2890
2891 for topic in &connected_peer.topics {
2893 if let Some(mesh_peers) = self.mesh.get_mut(topic) {
2895 if mesh_peers.remove(&peer_id) {
2897 if let Some(m) = self.metrics.as_mut() {
2898 m.peers_removed(topic, Churn::Dc, 1);
2899 m.set_mesh_peers(topic, mesh_peers.len());
2900 }
2901 };
2902 }
2903
2904 if let Some(m) = self.metrics.as_mut() {
2905 m.dec_topic_peers(topic);
2906 }
2907
2908 self.fanout
2910 .get_mut(topic)
2911 .map(|peers| peers.remove(&peer_id));
2912 }
2913
2914 self.px_peers.remove(&peer_id);
2916 self.outbound_peers.remove(&peer_id);
2917
2918 if let Some(metrics) = self.metrics.as_mut() {
2920 metrics.peer_protocol_disconnected(connected_peer.kind.clone());
2921 }
2922
2923 self.connected_peers.remove(&peer_id);
2924
2925 if let Some((peer_score, ..)) = &mut self.peer_score {
2926 peer_score.remove_peer(&peer_id);
2927 }
2928 }
2929 }
2930
2931 fn on_address_change(
2932 &mut self,
2933 AddressChange {
2934 peer_id,
2935 old: endpoint_old,
2936 new: endpoint_new,
2937 ..
2938 }: AddressChange,
2939 ) {
2940 if let Some((peer_score, ..)) = &mut self.peer_score {
2942 if let Some(ip) = get_ip_addr(endpoint_old.get_remote_address()) {
2943 peer_score.remove_ip(&peer_id, &ip);
2944 } else {
2945 tracing::trace!(
2946 peer=%&peer_id,
2947 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2948 endpoint_old
2949 )
2950 }
2951 if let Some(ip) = get_ip_addr(endpoint_new.get_remote_address()) {
2952 peer_score.add_ip(&peer_id, ip);
2953 } else {
2954 tracing::trace!(
2955 peer=%peer_id,
2956 "Couldn't extract ip from endpoint of peer with endpoint {:?}",
2957 endpoint_new
2958 )
2959 }
2960 }
2961 }
2962}
2963
2964fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
2965 addr.iter().find_map(|p| match p {
2966 Ip4(addr) => Some(IpAddr::V4(addr)),
2967 Ip6(addr) => Some(IpAddr::V6(addr)),
2968 _ => None,
2969 })
2970}
2971
2972impl<C, F> NetworkBehaviour for Behaviour<C, F>
2973where
2974 C: Send + 'static + DataTransform,
2975 F: Send + 'static + TopicSubscriptionFilter,
2976{
2977 type ConnectionHandler = Handler;
2978 type ToSwarm = Event;
2979
2980 fn handle_established_inbound_connection(
2981 &mut self,
2982 connection_id: ConnectionId,
2983 peer_id: PeerId,
2984 _: &Multiaddr,
2985 _: &Multiaddr,
2986 ) -> Result<THandler<Self>, ConnectionDenied> {
2987 let connected_peer = self
2993 .connected_peers
2994 .entry(peer_id)
2995 .or_insert(PeerConnections {
2996 kind: PeerKind::Floodsub,
2997 connections: vec![],
2998 sender: Sender::new(self.config.connection_handler_queue_len()),
2999 topics: Default::default(),
3000 });
3001 connected_peer.connections.push(connection_id);
3003
3004 Ok(Handler::new(
3005 self.config.protocol_config(),
3006 connected_peer.sender.new_receiver(),
3007 ))
3008 }
3009
3010 fn handle_established_outbound_connection(
3011 &mut self,
3012 connection_id: ConnectionId,
3013 peer_id: PeerId,
3014 _: &Multiaddr,
3015 _: Endpoint,
3016 _: PortUse,
3017 ) -> Result<THandler<Self>, ConnectionDenied> {
3018 let connected_peer = self
3019 .connected_peers
3020 .entry(peer_id)
3021 .or_insert(PeerConnections {
3022 kind: PeerKind::Floodsub,
3023 connections: vec![],
3024 sender: Sender::new(self.config.connection_handler_queue_len()),
3025 topics: Default::default(),
3026 });
3027 connected_peer.connections.push(connection_id);
3029
3030 Ok(Handler::new(
3031 self.config.protocol_config(),
3032 connected_peer.sender.new_receiver(),
3033 ))
3034 }
3035
3036 fn on_connection_handler_event(
3037 &mut self,
3038 propagation_source: PeerId,
3039 _connection_id: ConnectionId,
3040 handler_event: THandlerOutEvent<Self>,
3041 ) {
3042 match handler_event {
3043 HandlerEvent::PeerKind(kind) => {
3044 if let Some(metrics) = self.metrics.as_mut() {
3047 metrics.peer_protocol_connected(kind.clone());
3048 }
3049
3050 if let PeerKind::NotSupported = kind {
3051 tracing::debug!(
3052 peer=%propagation_source,
3053 "Peer does not support gossipsub protocols"
3054 );
3055 self.events
3056 .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
3057 peer_id: propagation_source,
3058 }));
3059 } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
3060 tracing::debug!(
3064 peer=%propagation_source,
3065 peer_type=%kind,
3066 "New peer type found for peer"
3067 );
3068 if let PeerKind::Floodsub = conn.kind {
3069 conn.kind = kind;
3070 }
3071 }
3072 }
3073 HandlerEvent::MessageDropped(rpc) => {
3074 if let Some((peer_score, _, _, _)) = &mut self.peer_score {
3076 peer_score.failed_message_slow_peer(&propagation_source);
3077 }
3078
3079 let failed_messages = self.failed_messages.entry(propagation_source).or_default();
3081 failed_messages.timeout += 1;
3082 match rpc {
3083 RpcOut::Publish { .. } => {
3084 failed_messages.publish += 1;
3085 }
3086 RpcOut::Forward { .. } => {
3087 failed_messages.forward += 1;
3088 }
3089 _ => {}
3090 }
3091
3092 if let Some(metrics) = self.metrics.as_mut() {
3094 match rpc {
3095 RpcOut::Publish { message, .. } => {
3096 metrics.publish_msg_dropped(&message.topic);
3097 metrics.timeout_msg_dropped(&message.topic);
3098 }
3099 RpcOut::Forward { message, .. } => {
3100 metrics.forward_msg_dropped(&message.topic);
3101 metrics.timeout_msg_dropped(&message.topic);
3102 }
3103 _ => {}
3104 }
3105 }
3106 }
3107 HandlerEvent::Message {
3108 rpc,
3109 invalid_messages,
3110 } => {
3111 if !rpc.subscriptions.is_empty() {
3116 self.handle_received_subscriptions(&rpc.subscriptions, &propagation_source);
3117 }
3118
3119 if let (true, _) =
3121 self.score_below_threshold(&propagation_source, |pst| pst.graylist_threshold)
3122 {
3123 tracing::debug!(peer=%propagation_source, "RPC Dropped from greylisted peer");
3124 return;
3125 }
3126
3127 if self.peer_score.is_some() {
3129 for (raw_message, validation_error) in invalid_messages {
3130 self.handle_invalid_message(
3131 &propagation_source,
3132 &raw_message,
3133 RejectReason::ValidationError(validation_error),
3134 )
3135 }
3136 } else {
3137 for (message, validation_error) in invalid_messages {
3139 tracing::warn!(
3140 peer=%propagation_source,
3141 source=?message.source,
3142 "Invalid message from peer. Reason: {:?}",
3143 validation_error,
3144 );
3145 }
3146 }
3147
3148 for (count, raw_message) in rpc.messages.into_iter().enumerate() {
3150 if self.config.max_messages_per_rpc().is_some()
3152 && Some(count) >= self.config.max_messages_per_rpc()
3153 {
3154 tracing::warn!("Received more messages than permitted. Ignoring further messages. Processed: {}", count);
3155 break;
3156 }
3157 self.handle_received_message(raw_message, &propagation_source);
3158 }
3159
3160 let mut ihave_msgs = vec![];
3164 let mut graft_msgs = vec![];
3165 let mut prune_msgs = vec![];
3166 for control_msg in rpc.control_msgs {
3167 match control_msg {
3168 ControlAction::IHave(IHave {
3169 topic_hash,
3170 message_ids,
3171 }) => {
3172 ihave_msgs.push((topic_hash, message_ids));
3173 }
3174 ControlAction::IWant(IWant { message_ids }) => {
3175 self.handle_iwant(&propagation_source, message_ids)
3176 }
3177 ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
3178 ControlAction::Prune(Prune {
3179 topic_hash,
3180 peers,
3181 backoff,
3182 }) => prune_msgs.push((topic_hash, peers, backoff)),
3183 }
3184 }
3185 if !ihave_msgs.is_empty() {
3186 self.handle_ihave(&propagation_source, ihave_msgs);
3187 }
3188 if !graft_msgs.is_empty() {
3189 self.handle_graft(&propagation_source, graft_msgs);
3190 }
3191 if !prune_msgs.is_empty() {
3192 self.handle_prune(&propagation_source, prune_msgs);
3193 }
3194 }
3195 }
3196 }
3197
3198 #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self, cx))]
3199 fn poll(
3200 &mut self,
3201 cx: &mut Context<'_>,
3202 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
3203 if let Some(event) = self.events.pop_front() {
3204 return Poll::Ready(event);
3205 }
3206
3207 if let Some((peer_score, _, delay, _)) = &mut self.peer_score {
3209 if delay.poll_unpin(cx).is_ready() {
3210 peer_score.refresh_scores();
3211 delay.reset(peer_score.params.decay_interval);
3212 }
3213 }
3214
3215 if self.heartbeat.poll_unpin(cx).is_ready() {
3216 self.heartbeat();
3217 self.heartbeat.reset(self.config.heartbeat_interval());
3218 }
3219
3220 Poll::Pending
3221 }
3222
3223 fn on_swarm_event(&mut self, event: FromSwarm) {
3224 match event {
3225 FromSwarm::ConnectionEstablished(connection_established) => {
3226 self.on_connection_established(connection_established)
3227 }
3228 FromSwarm::ConnectionClosed(connection_closed) => {
3229 self.on_connection_closed(connection_closed)
3230 }
3231 FromSwarm::AddressChange(address_change) => self.on_address_change(address_change),
3232 _ => {}
3233 }
3234 }
3235}
3236
3237fn peer_added_to_mesh(
3241 peer_id: PeerId,
3242 new_topics: Vec<&TopicHash>,
3243 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3244 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3245 connections: &HashMap<PeerId, PeerConnections>,
3246) {
3247 let connection_id = match connections.get(&peer_id) {
3249 Some(p) => p
3250 .connections
3251 .first()
3252 .expect("There should be at least one connection to a peer."),
3253 None => {
3254 tracing::error!(peer_id=%peer_id, "Peer not existent when added to the mesh");
3255 return;
3256 }
3257 };
3258
3259 if let Some(peer) = connections.get(&peer_id) {
3260 for topic in &peer.topics {
3261 if !new_topics.contains(&topic) {
3262 if let Some(mesh_peers) = mesh.get(topic) {
3263 if mesh_peers.contains(&peer_id) {
3264 return;
3266 }
3267 }
3268 }
3269 }
3270 }
3271 events.push_back(ToSwarm::NotifyHandler {
3273 peer_id,
3274 event: HandlerIn::JoinedMesh,
3275 handler: NotifyHandler::One(*connection_id),
3276 });
3277}
3278
3279fn peer_removed_from_mesh(
3283 peer_id: PeerId,
3284 old_topic: &TopicHash,
3285 mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
3286 events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
3287 connections: &HashMap<PeerId, PeerConnections>,
3288) {
3289 let connection_id = match connections.get(&peer_id) {
3291 Some(p) => p
3292 .connections
3293 .first()
3294 .expect("There should be at least one connection to a peer."),
3295 None => {
3296 tracing::error!(peer_id=%peer_id, "Peer not existent when removed from mesh");
3297 return;
3298 }
3299 };
3300
3301 if let Some(peer) = connections.get(&peer_id) {
3302 for topic in &peer.topics {
3303 if topic != old_topic {
3304 if let Some(mesh_peers) = mesh.get(topic) {
3305 if mesh_peers.contains(&peer_id) {
3306 return;
3308 }
3309 }
3310 }
3311 }
3312 }
3313 events.push_back(ToSwarm::NotifyHandler {
3315 peer_id,
3316 event: HandlerIn::LeftMesh,
3317 handler: NotifyHandler::One(*connection_id),
3318 });
3319}
3320
3321fn get_random_peers_dynamic(
3325 connected_peers: &HashMap<PeerId, PeerConnections>,
3326 topic_hash: &TopicHash,
3327 n_map: impl Fn(usize) -> usize,
3329 mut f: impl FnMut(&PeerId) -> bool,
3330) -> BTreeSet<PeerId> {
3331 let mut gossip_peers = connected_peers
3332 .iter()
3333 .filter(|(_, p)| p.topics.contains(topic_hash))
3334 .filter(|(peer_id, _)| f(peer_id))
3335 .filter(|(_, p)| p.kind == PeerKind::Gossipsub || p.kind == PeerKind::Gossipsubv1_1)
3336 .map(|(peer_id, _)| *peer_id)
3337 .collect::<Vec<PeerId>>();
3338
3339 let n = n_map(gossip_peers.len());
3341 if gossip_peers.len() <= n {
3342 tracing::debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
3343 return gossip_peers.into_iter().collect();
3344 }
3345
3346 let mut rng = thread_rng();
3348 gossip_peers.partial_shuffle(&mut rng, n);
3349
3350 tracing::debug!("RANDOM PEERS: Got {:?} peers", n);
3351
3352 gossip_peers.into_iter().take(n).collect()
3353}
3354
3355fn get_random_peers(
3358 connected_peers: &HashMap<PeerId, PeerConnections>,
3359 topic_hash: &TopicHash,
3360 n: usize,
3361 f: impl FnMut(&PeerId) -> bool,
3362) -> BTreeSet<PeerId> {
3363 get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
3364}
3365
3366fn validate_config(
3369 authenticity: &MessageAuthenticity,
3370 validation_mode: &ValidationMode,
3371) -> Result<(), &'static str> {
3372 match validation_mode {
3373 ValidationMode::Anonymous => {
3374 if authenticity.is_signing() {
3375 return Err("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity");
3376 }
3377
3378 if !authenticity.is_anonymous() {
3379 return Err("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config");
3380 }
3381 }
3382 ValidationMode::Strict => {
3383 if !authenticity.is_signing() {
3384 return Err(
3385 "Messages will be
3386 published unsigned and incoming unsigned messages will be rejected. Consider adjusting
3387 the validation or privacy settings in the config"
3388 );
3389 }
3390 }
3391 _ => {}
3392 }
3393 Ok(())
3394}
3395
3396impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
3397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3398 f.debug_struct("Behaviour")
3399 .field("config", &self.config)
3400 .field("events", &self.events.len())
3401 .field("publish_config", &self.publish_config)
3402 .field("mesh", &self.mesh)
3403 .field("fanout", &self.fanout)
3404 .field("fanout_last_pub", &self.fanout_last_pub)
3405 .field("mcache", &self.mcache)
3406 .field("heartbeat", &self.heartbeat)
3407 .finish()
3408 }
3409}
3410
3411impl fmt::Debug for PublishConfig {
3412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3413 match self {
3414 PublishConfig::Signing { author, .. } => {
3415 f.write_fmt(format_args!("PublishConfig::Signing({author})"))
3416 }
3417 PublishConfig::Author(author) => {
3418 f.write_fmt(format_args!("PublishConfig::Author({author})"))
3419 }
3420 PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")),
3421 PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")),
3422 }
3423 }
3424}