ant_libp2p_gossipsub/config.rs
1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use ant_libp2p_swarm as libp2p_swarm;
22
23use std::{borrow::Cow, sync::Arc, time::Duration};
24
25use libp2p_identity::PeerId;
26use libp2p_swarm::StreamProtocol;
27
28use crate::{
29 error::ConfigBuilderError,
30 protocol::{ProtocolConfig, ProtocolId, FLOODSUB_PROTOCOL},
31 types::{Message, MessageId, PeerKind},
32};
33
34/// The types of message validation that can be employed by gossipsub.
35#[derive(Debug, Clone)]
36pub enum ValidationMode {
37 /// This is the default setting. This requires the message author to be a valid [`PeerId`] and
38 /// to be present as well as the sequence number. All messages must have valid signatures.
39 ///
40 /// NOTE: This setting will reject messages from nodes using
41 /// [`crate::behaviour::MessageAuthenticity::Anonymous`] and all messages that do not have
42 /// signatures.
43 Strict,
44 /// This setting permits messages that have no author, sequence number or signature. If any of
45 /// these fields exist in the message these are validated.
46 Permissive,
47 /// This setting requires the author, sequence number and signature fields of a message to be
48 /// empty. Any message that contains these fields is considered invalid.
49 Anonymous,
50 /// This setting does not check the author, sequence number or signature fields of incoming
51 /// messages. If these fields contain data, they are simply ignored.
52 ///
53 /// NOTE: This setting will consider messages with invalid signatures as valid messages.
54 None,
55}
56
57/// Selector for custom Protocol Id
58#[derive(Clone, Debug, PartialEq, Eq)]
59pub enum Version {
60 V1_0,
61 V1_1,
62}
63
64/// Configuration parameters that define the performance of the gossipsub network.
65#[derive(Clone)]
66pub struct Config {
67 protocol: ProtocolConfig,
68 history_length: usize,
69 history_gossip: usize,
70 mesh_n: usize,
71 mesh_n_low: usize,
72 mesh_n_high: usize,
73 retain_scores: usize,
74 gossip_lazy: usize,
75 gossip_factor: f64,
76 heartbeat_initial_delay: Duration,
77 heartbeat_interval: Duration,
78 fanout_ttl: Duration,
79 check_explicit_peers_ticks: u64,
80 duplicate_cache_time: Duration,
81 validate_messages: bool,
82 message_id_fn: Arc<dyn Fn(&Message) -> MessageId + Send + Sync + 'static>,
83 allow_self_origin: bool,
84 do_px: bool,
85 prune_peers: usize,
86 prune_backoff: Duration,
87 unsubscribe_backoff: Duration,
88 backoff_slack: u32,
89 flood_publish: bool,
90 graft_flood_threshold: Duration,
91 mesh_outbound_min: usize,
92 opportunistic_graft_ticks: u64,
93 opportunistic_graft_peers: usize,
94 gossip_retransimission: u32,
95 max_messages_per_rpc: Option<usize>,
96 max_ihave_length: usize,
97 max_ihave_messages: usize,
98 iwant_followup_time: Duration,
99 published_message_ids_cache_time: Duration,
100 connection_handler_queue_len: usize,
101 connection_handler_publish_duration: Duration,
102 connection_handler_forward_duration: Duration,
103}
104
105impl Config {
106 pub(crate) fn protocol_config(&self) -> ProtocolConfig {
107 self.protocol.clone()
108 }
109
110 // Overlay network parameters.
111 /// Number of heartbeats to keep in the `memcache` (default is 5).
112 pub fn history_length(&self) -> usize {
113 self.history_length
114 }
115
116 /// Number of past heartbeats to gossip about (default is 3).
117 pub fn history_gossip(&self) -> usize {
118 self.history_gossip
119 }
120
121 /// Target number of peers for the mesh network (D in the spec, default is 6).
122 pub fn mesh_n(&self) -> usize {
123 self.mesh_n
124 }
125
126 /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 5).
127 pub fn mesh_n_low(&self) -> usize {
128 self.mesh_n_low
129 }
130
131 /// Maximum number of peers in mesh network before removing some (D_high in the spec, default
132 /// is 12).
133 pub fn mesh_n_high(&self) -> usize {
134 self.mesh_n_high
135 }
136
137 /// Affects how peers are selected when pruning a mesh due to over subscription.
138 ///
139 /// At least `retain_scores` of the retained peers will be high-scoring, while the remainder
140 /// are chosen randomly (D_score in the spec, default is 4).
141 pub fn retain_scores(&self) -> usize {
142 self.retain_scores
143 }
144
145 /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
146 /// default is 6).
147 pub fn gossip_lazy(&self) -> usize {
148 self.gossip_lazy
149 }
150
151 /// Affects how many peers we will emit gossip to at each heartbeat.
152 ///
153 /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
154 /// `gossip_lazy`, whichever is greater. The default is 0.25.
155 pub fn gossip_factor(&self) -> f64 {
156 self.gossip_factor
157 }
158
159 /// Initial delay in each heartbeat (default is 5 seconds).
160 pub fn heartbeat_initial_delay(&self) -> Duration {
161 self.heartbeat_initial_delay
162 }
163
164 /// Time between each heartbeat (default is 1 second).
165 pub fn heartbeat_interval(&self) -> Duration {
166 self.heartbeat_interval
167 }
168
169 /// Time to live for fanout peers (default is 60 seconds).
170 pub fn fanout_ttl(&self) -> Duration {
171 self.fanout_ttl
172 }
173
174 /// The number of heartbeat ticks until we recheck the connection to explicit peers and
175 /// reconnecting if necessary (default 300).
176 pub fn check_explicit_peers_ticks(&self) -> u64 {
177 self.check_explicit_peers_ticks
178 }
179
180 /// The maximum byte size for each gossipsub RPC (default is 65536 bytes).
181 ///
182 /// This represents the maximum size of the published message. It is additionally wrapped
183 /// in a protobuf struct, so the actual wire size may be a bit larger. It must be at least
184 /// large enough to support basic control messages. If Peer eXchange is enabled, this
185 /// must be large enough to transmit the desired peer information on pruning. It must be at
186 /// least 100 bytes. Default is 65536 bytes.
187 pub fn max_transmit_size(&self) -> usize {
188 self.protocol.max_transmit_size
189 }
190
191 /// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
192 /// This settings sets the time period that messages are stored in the cache. Duplicates can be
193 /// received if duplicate messages are sent at a time greater than this setting apart. The
194 /// default is 1 minute.
195 pub fn duplicate_cache_time(&self) -> Duration {
196 self.duplicate_cache_time
197 }
198
199 /// When set to `true`, prevents automatic forwarding of all received messages. This setting
200 /// allows a user to validate the messages before propagating them to their peers. If set to
201 /// true, the user must manually call [`crate::Behaviour::report_message_validation_result()`]
202 /// on the behaviour to forward message once validated (default is `false`).
203 /// The default is `false`.
204 pub fn validate_messages(&self) -> bool {
205 self.validate_messages
206 }
207
208 /// Determines the level of validation used when receiving messages. See [`ValidationMode`]
209 /// for the available types. The default is ValidationMode::Strict.
210 pub fn validation_mode(&self) -> &ValidationMode {
211 &self.protocol.validation_mode
212 }
213
214 /// A user-defined function allowing the user to specify the message id of a gossipsub message.
215 /// The default value is to concatenate the source peer id with a sequence number. Setting this
216 /// parameter allows the user to address packets arbitrarily. One example is content based
217 /// addressing, where this function may be set to `hash(message)`. This would prevent messages
218 /// of the same content from being duplicated.
219 ///
220 /// The function takes a [`Message`] as input and outputs a String to be interpreted as
221 /// the message id.
222 pub fn message_id(&self, message: &Message) -> MessageId {
223 (self.message_id_fn)(message)
224 }
225
226 /// By default, gossipsub will reject messages that are sent to us that have the same message
227 /// source as we have specified locally. Enabling this, allows these messages and prevents
228 /// penalizing the peer that sent us the message. Default is false.
229 pub fn allow_self_origin(&self) -> bool {
230 self.allow_self_origin
231 }
232
233 /// Whether Peer eXchange is enabled; this should be enabled in bootstrappers and other well
234 /// connected/trusted nodes. The default is false.
235 ///
236 /// Note: Peer exchange is not implemented today, see
237 /// <https://github.com/libp2p/rust-libp2p/issues/2398>.
238 pub fn do_px(&self) -> bool {
239 self.do_px
240 }
241
242 /// Controls the number of peers to include in prune Peer eXchange.
243 /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
244 /// send them signed peer records for up to `prune_peers` other peers that we
245 /// know of. It is recommended that this value is larger than `mesh_n_high` so that the pruned
246 /// peer can reliably form a full mesh. The default is typically 16 however until signed
247 /// records are spec'd this is disabled and set to 0.
248 pub fn prune_peers(&self) -> usize {
249 self.prune_peers
250 }
251
252 /// Controls the backoff time for pruned peers. This is how long
253 /// a peer must wait before attempting to graft into our mesh again after being pruned.
254 /// When pruning a peer, we send them our value of `prune_backoff` so they know
255 /// the minimum time to wait. Peers running older versions may not send a backoff time,
256 /// so if we receive a prune message without one, we will wait at least `prune_backoff`
257 /// before attempting to re-graft. The default is one minute.
258 pub fn prune_backoff(&self) -> Duration {
259 self.prune_backoff
260 }
261
262 /// Controls the backoff time when unsubscribing from a topic.
263 ///
264 /// This is how long to wait before resubscribing to the topic. A short backoff period in case
265 /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
266 /// is 10 seconds.
267 pub fn unsubscribe_backoff(&self) -> Duration {
268 self.unsubscribe_backoff
269 }
270
271 /// Number of heartbeat slots considered as slack for backoffs. This guarantees that we wait
272 /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
273 /// solves problems occurring through high latencies. In particular if
274 /// `backoff_slack * heartbeat_interval` is longer than any latencies between processing
275 /// prunes on our side and processing prunes on the receiving side this guarantees that we
276 /// get not punished for too early grafting. The default is 1.
277 pub fn backoff_slack(&self) -> u32 {
278 self.backoff_slack
279 }
280
281 /// Whether to do flood publishing or not. If enabled newly created messages will always be
282 /// sent to all peers that are subscribed to the topic and have a good enough score.
283 /// The default is true.
284 pub fn flood_publish(&self) -> bool {
285 self.flood_publish
286 }
287
288 /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE,
289 /// then there is an extra score penalty applied to the peer through P7.
290 pub fn graft_flood_threshold(&self) -> Duration {
291 self.graft_flood_threshold
292 }
293
294 /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec).
295 /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`.
296 /// The default is 2.
297 pub fn mesh_outbound_min(&self) -> usize {
298 self.mesh_outbound_min
299 }
300
301 /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is
302 /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
303 /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
304 /// threshold (see <https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds>).
305 /// The default is 60.
306 pub fn opportunistic_graft_ticks(&self) -> u64 {
307 self.opportunistic_graft_ticks
308 }
309
310 /// Controls how many times we will allow a peer to request the same message id through IWANT
311 /// gossip before we start ignoring them. This is designed to prevent peers from spamming us
312 /// with requests and wasting our resources. The default is 3.
313 pub fn gossip_retransimission(&self) -> u32 {
314 self.gossip_retransimission
315 }
316
317 /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
318 pub fn opportunistic_graft_peers(&self) -> usize {
319 self.opportunistic_graft_peers
320 }
321
322 /// The maximum number of messages we will process in a given RPC. If this is unset, there is
323 /// no limit. The default is None.
324 pub fn max_messages_per_rpc(&self) -> Option<usize> {
325 self.max_messages_per_rpc
326 }
327
328 /// The maximum number of messages to include in an IHAVE message.
329 /// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
330 /// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
331 /// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip
332 /// heartbeats; with the defaults this is 1666 messages/s. The default is 5000.
333 pub fn max_ihave_length(&self) -> usize {
334 self.max_ihave_length
335 }
336
337 /// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer
338 /// within a heartbeat.
339 pub fn max_ihave_messages(&self) -> usize {
340 self.max_ihave_messages
341 }
342
343 /// Time to wait for a message requested through IWANT following an IHAVE advertisement.
344 /// If the message is not received within this window, a broken promise is declared and
345 /// the router may apply behavioural penalties. The default is 3 seconds.
346 pub fn iwant_followup_time(&self) -> Duration {
347 self.iwant_followup_time
348 }
349
350 /// Enable support for flooodsub peers. Default false.
351 pub fn support_floodsub(&self) -> bool {
352 self.protocol.protocol_ids.contains(&FLOODSUB_PROTOCOL)
353 }
354
355 /// Published message ids time cache duration. The default is 10 seconds.
356 pub fn published_message_ids_cache_time(&self) -> Duration {
357 self.published_message_ids_cache_time
358 }
359
360 /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
361 pub fn connection_handler_queue_len(&self) -> usize {
362 self.connection_handler_queue_len
363 }
364
365 /// The duration a message to be published can wait to be sent before it is abandoned. The
366 /// default is 5 seconds.
367 pub fn publish_queue_duration(&self) -> Duration {
368 self.connection_handler_publish_duration
369 }
370
371 /// The duration a message to be forwarded can wait to be sent before it is abandoned. The
372 /// default is 1s.
373 pub fn forward_queue_duration(&self) -> Duration {
374 self.connection_handler_forward_duration
375 }
376}
377
378impl Default for Config {
379 fn default() -> Self {
380 // use ConfigBuilder to also validate defaults
381 ConfigBuilder::default()
382 .build()
383 .expect("Default config parameters should be valid parameters")
384 }
385}
386
387/// The builder struct for constructing a gossipsub configuration.
388pub struct ConfigBuilder {
389 config: Config,
390 invalid_protocol: bool, // This is a bit of a hack to only expose one error to the user.
391}
392
393impl Default for ConfigBuilder {
394 fn default() -> Self {
395 ConfigBuilder {
396 config: Config {
397 protocol: ProtocolConfig::default(),
398 history_length: 5,
399 history_gossip: 3,
400 mesh_n: 6,
401 mesh_n_low: 5,
402 mesh_n_high: 12,
403 retain_scores: 4,
404 gossip_lazy: 6, // default to mesh_n
405 gossip_factor: 0.25,
406 heartbeat_initial_delay: Duration::from_secs(5),
407 heartbeat_interval: Duration::from_secs(1),
408 fanout_ttl: Duration::from_secs(60),
409 check_explicit_peers_ticks: 300,
410 duplicate_cache_time: Duration::from_secs(60),
411 validate_messages: false,
412 message_id_fn: Arc::new(|message| {
413 // default message id is: source + sequence number
414 // NOTE: If either the peer_id or source is not provided, we set to 0;
415 let mut source_string = if let Some(peer_id) = message.source.as_ref() {
416 peer_id.to_base58()
417 } else {
418 PeerId::from_bytes(&[0, 1, 0])
419 .expect("Valid peer id")
420 .to_base58()
421 };
422 source_string
423 .push_str(&message.sequence_number.unwrap_or_default().to_string());
424 MessageId::from(source_string)
425 }),
426 allow_self_origin: false,
427 do_px: false,
428 // NOTE: Increasing this currently has little effect until Signed
429 // records are implemented.
430 prune_peers: 0,
431 prune_backoff: Duration::from_secs(60),
432 unsubscribe_backoff: Duration::from_secs(10),
433 backoff_slack: 1,
434 flood_publish: true,
435 graft_flood_threshold: Duration::from_secs(10),
436 mesh_outbound_min: 2,
437 opportunistic_graft_ticks: 60,
438 opportunistic_graft_peers: 2,
439 gossip_retransimission: 3,
440 max_messages_per_rpc: None,
441 max_ihave_length: 5000,
442 max_ihave_messages: 10,
443 iwant_followup_time: Duration::from_secs(3),
444 published_message_ids_cache_time: Duration::from_secs(10),
445 connection_handler_queue_len: 5000,
446 connection_handler_publish_duration: Duration::from_secs(5),
447 connection_handler_forward_duration: Duration::from_secs(1),
448 },
449 invalid_protocol: false,
450 }
451 }
452}
453
454impl From<Config> for ConfigBuilder {
455 fn from(config: Config) -> Self {
456 ConfigBuilder {
457 config,
458 invalid_protocol: false,
459 }
460 }
461}
462
463impl ConfigBuilder {
464 /// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.1.0` and
465 /// `/meshsub/1.0.0`).
466 pub fn protocol_id_prefix(
467 &mut self,
468 protocol_id_prefix: impl Into<Cow<'static, str>>,
469 ) -> &mut Self {
470 let cow = protocol_id_prefix.into();
471
472 match (
473 StreamProtocol::try_from_owned(format!("{}/1.1.0", cow)),
474 StreamProtocol::try_from_owned(format!("{}/1.0.0", cow)),
475 ) {
476 (Ok(p1), Ok(p2)) => {
477 self.config.protocol.protocol_ids = vec![
478 ProtocolId {
479 protocol: p1,
480 kind: PeerKind::Gossipsubv1_1,
481 },
482 ProtocolId {
483 protocol: p2,
484 kind: PeerKind::Gossipsub,
485 },
486 ]
487 }
488 _ => {
489 self.invalid_protocol = true;
490 }
491 }
492
493 self
494 }
495
496 /// The full protocol id to negotiate this protocol (does not append `/1.0.0` or `/1.1.0`).
497 pub fn protocol_id(
498 &mut self,
499 protocol_id: impl Into<Cow<'static, str>>,
500 custom_id_version: Version,
501 ) -> &mut Self {
502 let cow = protocol_id.into();
503
504 match StreamProtocol::try_from_owned(cow.to_string()) {
505 Ok(protocol) => {
506 self.config.protocol.protocol_ids = vec![ProtocolId {
507 protocol,
508 kind: match custom_id_version {
509 Version::V1_1 => PeerKind::Gossipsubv1_1,
510 Version::V1_0 => PeerKind::Gossipsub,
511 },
512 }]
513 }
514 _ => {
515 self.invalid_protocol = true;
516 }
517 }
518
519 self
520 }
521
522 /// Number of heartbeats to keep in the `memcache` (default is 5).
523 pub fn history_length(&mut self, history_length: usize) -> &mut Self {
524 self.config.history_length = history_length;
525 self
526 }
527
528 /// Number of past heartbeats to gossip about (default is 3).
529 pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self {
530 self.config.history_gossip = history_gossip;
531 self
532 }
533
534 /// Target number of peers for the mesh network (D in the spec, default is 6).
535 pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self {
536 self.config.mesh_n = mesh_n;
537 self
538 }
539
540 /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4).
541 pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self {
542 self.config.mesh_n_low = mesh_n_low;
543 self
544 }
545
546 /// Maximum number of peers in mesh network before removing some (D_high in the spec, default
547 /// is 12).
548 pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self {
549 self.config.mesh_n_high = mesh_n_high;
550 self
551 }
552
553 /// Affects how peers are selected when pruning a mesh due to over subscription.
554 ///
555 /// At least [`Self::retain_scores`] of the retained peers will be high-scoring, while the
556 /// remainder are chosen randomly (D_score in the spec, default is 4).
557 pub fn retain_scores(&mut self, retain_scores: usize) -> &mut Self {
558 self.config.retain_scores = retain_scores;
559 self
560 }
561
562 /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
563 /// default is 6).
564 pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
565 self.config.gossip_lazy = gossip_lazy;
566 self
567 }
568
569 /// Affects how many peers we will emit gossip to at each heartbeat.
570 ///
571 /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
572 /// `gossip_lazy`, whichever is greater. The default is 0.25.
573 pub fn gossip_factor(&mut self, gossip_factor: f64) -> &mut Self {
574 self.config.gossip_factor = gossip_factor;
575 self
576 }
577
578 /// Initial delay in each heartbeat (default is 5 seconds).
579 pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
580 self.config.heartbeat_initial_delay = heartbeat_initial_delay;
581 self
582 }
583
584 /// Time between each heartbeat (default is 1 second).
585 pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self {
586 self.config.heartbeat_interval = heartbeat_interval;
587 self
588 }
589
590 /// The number of heartbeat ticks until we recheck the connection to explicit peers and
591 /// reconnecting if necessary (default 300).
592 pub fn check_explicit_peers_ticks(&mut self, check_explicit_peers_ticks: u64) -> &mut Self {
593 self.config.check_explicit_peers_ticks = check_explicit_peers_ticks;
594 self
595 }
596
597 /// Time to live for fanout peers (default is 60 seconds).
598 pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self {
599 self.config.fanout_ttl = fanout_ttl;
600 self
601 }
602
603 /// The maximum byte size for each gossip (default is 2048 bytes).
604 pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self {
605 self.config.protocol.max_transmit_size = max_transmit_size;
606 self
607 }
608
609 /// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
610 /// This settings sets the time period that messages are stored in the cache. Duplicates can be
611 /// received if duplicate messages are sent at a time greater than this setting apart. The
612 /// default is 1 minute.
613 pub fn duplicate_cache_time(&mut self, cache_size: Duration) -> &mut Self {
614 self.config.duplicate_cache_time = cache_size;
615 self
616 }
617
618 /// When set, prevents automatic forwarding of all received messages. This setting
619 /// allows a user to validate the messages before propagating them to their peers. If set,
620 /// the user must manually call [`crate::Behaviour::report_message_validation_result()`] on the
621 /// behaviour to forward a message once validated.
622 pub fn validate_messages(&mut self) -> &mut Self {
623 self.config.validate_messages = true;
624 self
625 }
626
627 /// Determines the level of validation used when receiving messages. See [`ValidationMode`]
628 /// for the available types. The default is ValidationMode::Strict.
629 pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self {
630 self.config.protocol.validation_mode = validation_mode;
631 self
632 }
633
634 /// A user-defined function allowing the user to specify the message id of a gossipsub message.
635 /// The default value is to concatenate the source peer id with a sequence number. Setting this
636 /// parameter allows the user to address packets arbitrarily. One example is content based
637 /// addressing, where this function may be set to `hash(message)`. This would prevent messages
638 /// of the same content from being duplicated.
639 ///
640 /// The function takes a [`Message`] as input and outputs a String to be
641 /// interpreted as the message id.
642 pub fn message_id_fn<F>(&mut self, id_fn: F) -> &mut Self
643 where
644 F: Fn(&Message) -> MessageId + Send + Sync + 'static,
645 {
646 self.config.message_id_fn = Arc::new(id_fn);
647 self
648 }
649
650 /// Enables Peer eXchange. This should be enabled in bootstrappers and other well
651 /// connected/trusted nodes. The default is false.
652 ///
653 /// Note: Peer exchange is not implemented today, see
654 /// <https://github.com/libp2p/rust-libp2p/issues/2398>.
655 pub fn do_px(&mut self) -> &mut Self {
656 self.config.do_px = true;
657 self
658 }
659
660 /// Controls the number of peers to include in prune Peer eXchange.
661 ///
662 /// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
663 /// send them signed peer records for up to [`Self::prune_peers] other peers that we
664 /// know of. It is recommended that this value is larger than [`Self::mesh_n_high`] so that the
665 /// pruned peer can reliably form a full mesh. The default is 16.
666 pub fn prune_peers(&mut self, prune_peers: usize) -> &mut Self {
667 self.config.prune_peers = prune_peers;
668 self
669 }
670
671 /// Controls the backoff time for pruned peers. This is how long
672 /// a peer must wait before attempting to graft into our mesh again after being pruned.
673 /// When pruning a peer, we send them our value of [`Self::prune_backoff`] so they know
674 /// the minimum time to wait. Peers running older versions may not send a backoff time,
675 /// so if we receive a prune message without one, we will wait at least [`Self::prune_backoff`]
676 /// before attempting to re-graft. The default is one minute.
677 pub fn prune_backoff(&mut self, prune_backoff: Duration) -> &mut Self {
678 self.config.prune_backoff = prune_backoff;
679 self
680 }
681
682 /// Controls the backoff time when unsubscribing from a topic.
683 ///
684 /// This is how long to wait before resubscribing to the topic. A short backoff period in case
685 /// of an unsubscribe event allows reaching a healthy mesh in a more timely manner. The default
686 /// is 10 seconds.
687 pub fn unsubscribe_backoff(&mut self, unsubscribe_backoff: u64) -> &mut Self {
688 self.config.unsubscribe_backoff = Duration::from_secs(unsubscribe_backoff);
689 self
690 }
691
692 /// Number of heartbeat slots considered as slack for backoffs. This guarantees that we wait
693 /// at least backoff_slack heartbeats after a backoff is over before we try to graft. This
694 /// solves problems occurring through high latencies. In particular if
695 /// `backoff_slack * heartbeat_interval` is longer than any latencies between processing
696 /// prunes on our side and processing prunes on the receiving side this guarantees that we
697 /// get not punished for too early grafting. The default is 1.
698 pub fn backoff_slack(&mut self, backoff_slack: u32) -> &mut Self {
699 self.config.backoff_slack = backoff_slack;
700 self
701 }
702
703 /// Whether to do flood publishing or not. If enabled newly created messages will always be
704 /// sent to all peers that are subscribed to the topic and have a good enough score.
705 /// The default is true.
706 pub fn flood_publish(&mut self, flood_publish: bool) -> &mut Self {
707 self.config.flood_publish = flood_publish;
708 self
709 }
710
711 /// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE,
712 /// then there is an extra score penalty applied to the peer through P7.
713 pub fn graft_flood_threshold(&mut self, graft_flood_threshold: Duration) -> &mut Self {
714 self.config.graft_flood_threshold = graft_flood_threshold;
715 self
716 }
717
718 /// Minimum number of outbound peers in the mesh network before adding more (D_out in the spec).
719 /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`.
720 /// The default is 2.
721 pub fn mesh_outbound_min(&mut self, mesh_outbound_min: usize) -> &mut Self {
722 self.config.mesh_outbound_min = mesh_outbound_min;
723 self
724 }
725
726 /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is
727 /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
728 /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
729 /// threshold (see <https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds>).
730 /// The default is 60.
731 pub fn opportunistic_graft_ticks(&mut self, opportunistic_graft_ticks: u64) -> &mut Self {
732 self.config.opportunistic_graft_ticks = opportunistic_graft_ticks;
733 self
734 }
735
736 /// Controls how many times we will allow a peer to request the same message id through IWANT
737 /// gossip before we start ignoring them. This is designed to prevent peers from spamming us
738 /// with requests and wasting our resources.
739 pub fn gossip_retransimission(&mut self, gossip_retransimission: u32) -> &mut Self {
740 self.config.gossip_retransimission = gossip_retransimission;
741 self
742 }
743
744 /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
745 pub fn opportunistic_graft_peers(&mut self, opportunistic_graft_peers: usize) -> &mut Self {
746 self.config.opportunistic_graft_peers = opportunistic_graft_peers;
747 self
748 }
749
750 /// The maximum number of messages we will process in a given RPC. If this is unset, there is
751 /// no limit. The default is None.
752 pub fn max_messages_per_rpc(&mut self, max: Option<usize>) -> &mut Self {
753 self.config.max_messages_per_rpc = max;
754 self
755 }
756
757 /// The maximum number of messages to include in an IHAVE message.
758 /// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
759 /// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
760 /// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip
761 /// heartbeats; with the defaults this is 1666 messages/s. The default is 5000.
762 pub fn max_ihave_length(&mut self, max_ihave_length: usize) -> &mut Self {
763 self.config.max_ihave_length = max_ihave_length;
764 self
765 }
766
767 /// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer
768 /// within a heartbeat.
769 pub fn max_ihave_messages(&mut self, max_ihave_messages: usize) -> &mut Self {
770 self.config.max_ihave_messages = max_ihave_messages;
771 self
772 }
773
774 /// By default, gossipsub will reject messages that are sent to us that has the same message
775 /// source as we have specified locally. Enabling this, allows these messages and prevents
776 /// penalizing the peer that sent us the message. Default is false.
777 pub fn allow_self_origin(&mut self, allow_self_origin: bool) -> &mut Self {
778 self.config.allow_self_origin = allow_self_origin;
779 self
780 }
781
782 /// Time to wait for a message requested through IWANT following an IHAVE advertisement.
783 /// If the message is not received within this window, a broken promise is declared and
784 /// the router may apply behavioural penalties. The default is 3 seconds.
785 pub fn iwant_followup_time(&mut self, iwant_followup_time: Duration) -> &mut Self {
786 self.config.iwant_followup_time = iwant_followup_time;
787 self
788 }
789
790 /// Enable support for flooodsub peers.
791 pub fn support_floodsub(&mut self) -> &mut Self {
792 if self
793 .config
794 .protocol
795 .protocol_ids
796 .contains(&FLOODSUB_PROTOCOL)
797 {
798 return self;
799 }
800
801 self.config.protocol.protocol_ids.push(FLOODSUB_PROTOCOL);
802 self
803 }
804
805 /// Published message ids time cache duration. The default is 10 seconds.
806 pub fn published_message_ids_cache_time(
807 &mut self,
808 published_message_ids_cache_time: Duration,
809 ) -> &mut Self {
810 self.config.published_message_ids_cache_time = published_message_ids_cache_time;
811 self
812 }
813
814 /// The max number of messages a `ConnectionHandler` can buffer. The default is 5000.
815 pub fn connection_handler_queue_len(&mut self, len: usize) -> &mut Self {
816 self.config.connection_handler_queue_len = len;
817 self
818 }
819
820 /// The duration a message to be published can wait to be sent before it is abandoned. The
821 /// default is 5 seconds.
822 pub fn publish_queue_duration(&mut self, duration: Duration) -> &mut Self {
823 self.config.connection_handler_publish_duration = duration;
824 self
825 }
826
827 /// The duration a message to be forwarded can wait to be sent before it is abandoned. The
828 /// default is 1s.
829 pub fn forward_queue_duration(&mut self, duration: Duration) -> &mut Self {
830 self.config.connection_handler_forward_duration = duration;
831 self
832 }
833
834 /// Constructs a [`Config`] from the given configuration and validates the settings.
835 pub fn build(&self) -> Result<Config, ConfigBuilderError> {
836 // check all constraints on config
837
838 if self.config.protocol.max_transmit_size < 100 {
839 return Err(ConfigBuilderError::MaxTransmissionSizeTooSmall);
840 }
841
842 if self.config.history_length < self.config.history_gossip {
843 return Err(ConfigBuilderError::HistoryLengthTooSmall);
844 }
845
846 if !(self.config.mesh_outbound_min <= self.config.mesh_n_low
847 && self.config.mesh_n_low <= self.config.mesh_n
848 && self.config.mesh_n <= self.config.mesh_n_high)
849 {
850 return Err(ConfigBuilderError::MeshParametersInvalid);
851 }
852
853 if self.config.mesh_outbound_min * 2 > self.config.mesh_n {
854 return Err(ConfigBuilderError::MeshOutboundInvalid);
855 }
856
857 if self.config.unsubscribe_backoff.as_millis() == 0 {
858 return Err(ConfigBuilderError::UnsubscribeBackoffIsZero);
859 }
860
861 if self.invalid_protocol {
862 return Err(ConfigBuilderError::InvalidProtocol);
863 }
864
865 Ok(self.config.clone())
866 }
867}
868
869impl std::fmt::Debug for Config {
870 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
871 let mut builder = f.debug_struct("GossipsubConfig");
872 let _ = builder.field("protocol", &self.protocol);
873 let _ = builder.field("history_length", &self.history_length);
874 let _ = builder.field("history_gossip", &self.history_gossip);
875 let _ = builder.field("mesh_n", &self.mesh_n);
876 let _ = builder.field("mesh_n_low", &self.mesh_n_low);
877 let _ = builder.field("mesh_n_high", &self.mesh_n_high);
878 let _ = builder.field("retain_scores", &self.retain_scores);
879 let _ = builder.field("gossip_lazy", &self.gossip_lazy);
880 let _ = builder.field("gossip_factor", &self.gossip_factor);
881 let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay);
882 let _ = builder.field("heartbeat_interval", &self.heartbeat_interval);
883 let _ = builder.field("fanout_ttl", &self.fanout_ttl);
884 let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time);
885 let _ = builder.field("validate_messages", &self.validate_messages);
886 let _ = builder.field("allow_self_origin", &self.allow_self_origin);
887 let _ = builder.field("do_px", &self.do_px);
888 let _ = builder.field("prune_peers", &self.prune_peers);
889 let _ = builder.field("prune_backoff", &self.prune_backoff);
890 let _ = builder.field("backoff_slack", &self.backoff_slack);
891 let _ = builder.field("flood_publish", &self.flood_publish);
892 let _ = builder.field("graft_flood_threshold", &self.graft_flood_threshold);
893 let _ = builder.field("mesh_outbound_min", &self.mesh_outbound_min);
894 let _ = builder.field("opportunistic_graft_ticks", &self.opportunistic_graft_ticks);
895 let _ = builder.field("opportunistic_graft_peers", &self.opportunistic_graft_peers);
896 let _ = builder.field("max_messages_per_rpc", &self.max_messages_per_rpc);
897 let _ = builder.field("max_ihave_length", &self.max_ihave_length);
898 let _ = builder.field("max_ihave_messages", &self.max_ihave_messages);
899 let _ = builder.field("iwant_followup_time", &self.iwant_followup_time);
900 let _ = builder.field(
901 "published_message_ids_cache_time",
902 &self.published_message_ids_cache_time,
903 );
904 builder.finish()
905 }
906}
907
908#[cfg(test)]
909mod test {
910 use std::{
911 collections::hash_map::DefaultHasher,
912 hash::{Hash, Hasher},
913 };
914
915 use libp2p_core::UpgradeInfo;
916
917 use super::*;
918 use crate::{topic::IdentityHash, Topic};
919
920 #[test]
921 fn create_config_with_message_id_as_plain_function() {
922 let config = ConfigBuilder::default()
923 .message_id_fn(message_id_plain_function)
924 .build()
925 .unwrap();
926
927 let result = config.message_id(&get_gossipsub_message());
928
929 assert_eq!(result, get_expected_message_id());
930 }
931
932 #[test]
933 fn create_config_with_message_id_as_closure() {
934 let config = ConfigBuilder::default()
935 .message_id_fn(|message: &Message| {
936 let mut s = DefaultHasher::new();
937 message.data.hash(&mut s);
938 let mut v = s.finish().to_string();
939 v.push('e');
940 MessageId::from(v)
941 })
942 .build()
943 .unwrap();
944
945 let result = config.message_id(&get_gossipsub_message());
946
947 assert_eq!(result, get_expected_message_id());
948 }
949
950 #[test]
951 fn create_config_with_message_id_as_closure_with_variable_capture() {
952 let captured: char = 'e';
953
954 let config = ConfigBuilder::default()
955 .message_id_fn(move |message: &Message| {
956 let mut s = DefaultHasher::new();
957 message.data.hash(&mut s);
958 let mut v = s.finish().to_string();
959 v.push(captured);
960 MessageId::from(v)
961 })
962 .build()
963 .unwrap();
964
965 let result = config.message_id(&get_gossipsub_message());
966
967 assert_eq!(result, get_expected_message_id());
968 }
969
970 #[test]
971 fn create_config_with_protocol_id_prefix() {
972 let protocol_config = ConfigBuilder::default()
973 .protocol_id_prefix("/purple")
974 .build()
975 .unwrap()
976 .protocol_config();
977
978 let protocol_ids = protocol_config.protocol_info();
979
980 assert_eq!(protocol_ids.len(), 2);
981
982 assert_eq!(
983 protocol_ids[0].protocol,
984 StreamProtocol::new("/purple/1.1.0")
985 );
986 assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsubv1_1);
987
988 assert_eq!(
989 protocol_ids[1].protocol,
990 StreamProtocol::new("/purple/1.0.0")
991 );
992 assert_eq!(protocol_ids[1].kind, PeerKind::Gossipsub);
993 }
994
995 #[test]
996 fn create_config_with_custom_protocol_id() {
997 let protocol_config = ConfigBuilder::default()
998 .protocol_id("/purple", Version::V1_0)
999 .build()
1000 .unwrap()
1001 .protocol_config();
1002
1003 let protocol_ids = protocol_config.protocol_info();
1004
1005 assert_eq!(protocol_ids.len(), 1);
1006
1007 assert_eq!(protocol_ids[0].protocol, "/purple");
1008 assert_eq!(protocol_ids[0].kind, PeerKind::Gossipsub);
1009 }
1010
1011 fn get_gossipsub_message() -> Message {
1012 Message {
1013 source: None,
1014 data: vec![12, 34, 56],
1015 sequence_number: None,
1016 topic: Topic::<IdentityHash>::new("test").hash(),
1017 }
1018 }
1019
1020 fn get_expected_message_id() -> MessageId {
1021 MessageId::from([
1022 49, 55, 56, 51, 56, 52, 49, 51, 52, 51, 52, 55, 51, 51, 53, 52, 54, 54, 52, 49, 101,
1023 ])
1024 }
1025
1026 fn message_id_plain_function(message: &Message) -> MessageId {
1027 let mut s = DefaultHasher::new();
1028 message.data.hash(&mut s);
1029 let mut v = s.finish().to_string();
1030 v.push('e');
1031 MessageId::from(v)
1032 }
1033}