Skip to main content

soil_network/gossip/
state_machine.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::gossip::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
8
9use ahash::AHashSet;
10use schnellru::{ByLength, LruMap};
11use soil_network::types::PeerId;
12
13use soil_prometheus::{register, Counter, PrometheusError, Registry, U64};
14use soil_network::common::role::ObservedRole;
15use soil_network::{types::ProtocolName, NotificationService};
16use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
17use subsoil::runtime::traits::{Block as BlockT, Hash, HashingFor};
18
19// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
20// NOTE: The current value is adjusted based on largest production network deployment (Kusama) and
21// the current main gossip user (GRANDPA). Currently there are ~800 validators on Kusama, as such,
22// each GRANDPA round should generate ~1600 messages, and we currently keep track of the last 2
23// completed rounds and the current live one. That makes it so that at any point we will be holding
24// ~4800 live messages.
25//
26// Assuming that each known message is tracked with a 32 byte hash (common for `Block::Hash`), then
27// this cache should take about 256 KB of memory.
28const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
29
30const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
31
32pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
33
34mod rep {
35	use soil_network::ReputationChange as Rep;
36	/// Reputation change when a peer sends us a gossip message that we didn't know about.
37	pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
38	/// Reputation change when a peer sends us a gossip message that we already knew about.
39	pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
40}
41
42struct PeerConsensus<H> {
43	known_messages: AHashSet<H>,
44}
45
46/// Topic stream message with sender.
47#[derive(Clone, Debug, Eq, PartialEq)]
48pub struct TopicNotification {
49	/// Message data.
50	pub message: Vec<u8>,
51	/// Sender if available.
52	pub sender: Option<PeerId>,
53}
54
55struct MessageEntry<B: BlockT> {
56	message_hash: B::Hash,
57	topic: B::Hash,
58	message: Vec<u8>,
59	sender: Option<PeerId>,
60}
61
62/// Local implementation of `ValidatorContext`.
63struct NetworkContext<'g, 'p, B: BlockT> {
64	gossip: &'g mut ConsensusGossip<B>,
65	notification_service: &'p mut Box<dyn NotificationService>,
66}
67
68impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
69	/// Broadcast all messages with given topic to peers that do not have it yet.
70	fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
71		self.gossip.broadcast_topic(self.notification_service, topic, force);
72	}
73
74	/// Broadcast a message to all peers that have not received it previously.
75	fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
76		self.gossip.multicast(self.notification_service, topic, message, force);
77	}
78
79	/// Send addressed message to a peer.
80	fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
81		self.notification_service.send_sync_notification(who, message);
82	}
83
84	/// Send all messages with given topic to a peer.
85	fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
86		self.gossip.send_topic(self.notification_service, who, topic, force);
87	}
88}
89
90fn propagate<'a, B: BlockT, I>(
91	notification_service: &mut Box<dyn NotificationService>,
92	protocol: ProtocolName,
93	messages: I,
94	intent: MessageIntent,
95	peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
96	validator: &Arc<dyn Validator<B>>,
97)
98// (msg_hash, topic, message)
99where
100	I: Clone + IntoIterator<Item = (&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
101{
102	let mut message_allowed = validator.message_allowed();
103
104	for (id, ref mut peer) in peers.iter_mut() {
105		for (message_hash, topic, message) in messages.clone() {
106			let intent = match intent {
107				MessageIntent::Broadcast { .. } => {
108					if peer.known_messages.contains(message_hash) {
109						continue;
110					} else {
111						MessageIntent::Broadcast
112					}
113				},
114				MessageIntent::PeriodicRebroadcast => {
115					if peer.known_messages.contains(message_hash) {
116						MessageIntent::PeriodicRebroadcast
117					} else {
118						// peer doesn't know message, so the logic should treat it as an
119						// initial broadcast.
120						MessageIntent::Broadcast
121					}
122				},
123				other => other,
124			};
125
126			if !message_allowed(id, intent, topic, message) {
127				continue;
128			}
129
130			peer.known_messages.insert(*message_hash);
131
132			tracing::trace!(
133				target: "gossip",
134				to = %id,
135				%protocol,
136				?message,
137				"Propagating message",
138			);
139			notification_service.send_sync_notification(id, message.clone());
140		}
141	}
142}
143
144/// Consensus network protocol handler. Manages statements and candidate requests.
145pub struct ConsensusGossip<B: BlockT> {
146	peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
147	messages: Vec<MessageEntry<B>>,
148	known_messages: LruMap<B::Hash, ()>,
149	protocol: ProtocolName,
150	validator: Arc<dyn Validator<B>>,
151	next_broadcast: Instant,
152	metrics: Option<Metrics>,
153}
154
155impl<B: BlockT> ConsensusGossip<B> {
156	/// Create a new instance using the given validator.
157	pub fn new(
158		validator: Arc<dyn Validator<B>>,
159		protocol: ProtocolName,
160		metrics_registry: Option<&Registry>,
161	) -> Self {
162		let metrics = match metrics_registry.map(Metrics::register) {
163			Some(Ok(metrics)) => Some(metrics),
164			Some(Err(e)) => {
165				tracing::debug!(target: "gossip", "Failed to register metrics: {:?}", e);
166				None
167			},
168			None => None,
169		};
170
171		ConsensusGossip {
172			peers: HashMap::new(),
173			messages: Default::default(),
174			known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
175			protocol,
176			validator,
177			next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
178			metrics,
179		}
180	}
181
182	/// Handle new connected peer.
183	pub fn new_peer(
184		&mut self,
185		notification_service: &mut Box<dyn NotificationService>,
186		who: PeerId,
187		role: ObservedRole,
188	) {
189		tracing::trace!(
190			target:"gossip",
191			%who,
192			protocol = %self.protocol,
193			?role,
194			"Registering peer",
195		);
196		self.peers.insert(who, PeerConsensus { known_messages: Default::default() });
197
198		let validator = self.validator.clone();
199		let mut context = NetworkContext { gossip: self, notification_service };
200		validator.new_peer(&mut context, &who, role);
201	}
202
203	fn register_message_hashed(
204		&mut self,
205		message_hash: B::Hash,
206		topic: B::Hash,
207		message: Vec<u8>,
208		sender: Option<PeerId>,
209	) {
210		if self.known_messages.insert(message_hash, ()) {
211			self.messages.push(MessageEntry { message_hash, topic, message, sender });
212
213			if let Some(ref metrics) = self.metrics {
214				metrics.registered_messages.inc();
215			}
216		}
217	}
218
219	/// Registers a message without propagating it to any peers. The message
220	/// becomes available to new peers or when the service is asked to gossip
221	/// the message's topic. No validation is performed on the message, if the
222	/// message is already expired it should be dropped on the next garbage
223	/// collection.
224	pub fn register_message(&mut self, topic: B::Hash, message: Vec<u8>) {
225		let message_hash = HashingFor::<B>::hash(&message[..]);
226		self.register_message_hashed(message_hash, topic, message, None);
227	}
228
229	/// Call when a peer has been disconnected to stop tracking gossip status.
230	pub fn peer_disconnected(
231		&mut self,
232		notification_service: &mut Box<dyn NotificationService>,
233		who: PeerId,
234	) {
235		let validator = self.validator.clone();
236		let mut context = NetworkContext { gossip: self, notification_service };
237		validator.peer_disconnected(&mut context, &who);
238		self.peers.remove(&who);
239	}
240
241	/// Perform periodic maintenance
242	pub fn tick(&mut self, notification_service: &mut Box<dyn NotificationService>) {
243		self.collect_garbage();
244		if Instant::now() >= self.next_broadcast {
245			self.rebroadcast(notification_service);
246			self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
247		}
248	}
249
250	/// Rebroadcast all messages to all peers.
251	fn rebroadcast(&mut self, notification_service: &mut Box<dyn NotificationService>) {
252		let messages = self
253			.messages
254			.iter()
255			.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
256
257		propagate(
258			notification_service,
259			self.protocol.clone(),
260			messages,
261			MessageIntent::PeriodicRebroadcast,
262			&mut self.peers,
263			&self.validator,
264		);
265	}
266
267	/// Broadcast all messages with given topic.
268	pub fn broadcast_topic(
269		&mut self,
270		notification_service: &mut Box<dyn NotificationService>,
271		topic: B::Hash,
272		force: bool,
273	) {
274		let messages = self.messages.iter().filter_map(|entry| {
275			if entry.topic == topic {
276				Some((&entry.message_hash, &entry.topic, &entry.message))
277			} else {
278				None
279			}
280		});
281		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
282		propagate(
283			notification_service,
284			self.protocol.clone(),
285			messages,
286			intent,
287			&mut self.peers,
288			&self.validator,
289		);
290	}
291
292	/// Prune old or no longer relevant consensus messages. Provide a predicate
293	/// for pruning, which returns `false` when the items with a given topic should be pruned.
294	pub fn collect_garbage(&mut self) {
295		let known_messages = &mut self.known_messages;
296		let before = self.messages.len();
297
298		let mut message_expired = self.validator.message_expired();
299		self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
300
301		let expired_messages = before - self.messages.len();
302
303		if let Some(ref metrics) = self.metrics {
304			metrics.expired_messages.inc_by(expired_messages as u64)
305		}
306
307		tracing::trace!(
308			target: "gossip",
309			protocol = %self.protocol,
310			"Cleaned up {} stale messages, {} left ({} known)",
311			expired_messages,
312			self.messages.len(),
313			known_messages.len(),
314		);
315
316		for (_, ref mut peer) in self.peers.iter_mut() {
317			peer.known_messages.retain(|h| known_messages.get(h).is_some());
318		}
319	}
320
321	/// Get valid messages received in the past for a topic (might have expired meanwhile).
322	pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
323		self.messages
324			.iter()
325			.filter(move |e| e.topic == topic)
326			.map(|entry| TopicNotification { message: entry.message.clone(), sender: entry.sender })
327	}
328
329	/// Register incoming messages and return the ones that are new and valid (according to a gossip
330	/// validator) and should thus be forwarded to the upper layers.
331	pub fn on_incoming(
332		&mut self,
333		network: &mut dyn Network<B>,
334		notification_service: &mut Box<dyn NotificationService>,
335		who: PeerId,
336		messages: Vec<Vec<u8>>,
337	) -> Vec<(B::Hash, TopicNotification)> {
338		let mut to_forward = vec![];
339
340		if !messages.is_empty() {
341			tracing::trace!(
342				target: "gossip",
343				messages_num = %messages.len(),
344				%who,
345				protocol = %self.protocol,
346				"Received messages from peer",
347			);
348		}
349
350		for message in messages {
351			let message_hash = HashingFor::<B>::hash(&message[..]);
352
353			if self.known_messages.get(&message_hash).is_some() {
354				tracing::trace!(
355					target: "gossip",
356					%who,
357					protocol = %self.protocol,
358					"Ignored already known message",
359				);
360
361				// If the peer already send us the message once, let's report them.
362				if self
363					.peers
364					.get_mut(&who)
365					.map_or(false, |p| !p.known_messages.insert(message_hash))
366				{
367					network.report_peer(who, rep::DUPLICATE_GOSSIP);
368				}
369				continue;
370			}
371
372			// validate the message
373			let validation = {
374				let validator = self.validator.clone();
375				let mut context = NetworkContext { gossip: self, notification_service };
376				validator.validate(&mut context, &who, &message)
377			};
378
379			let (topic, keep) = match validation {
380				ValidationResult::ProcessAndKeep(topic) => (topic, true),
381				ValidationResult::ProcessAndDiscard(topic) => (topic, false),
382				ValidationResult::Discard => {
383					tracing::trace!(
384						target: "gossip",
385						%who,
386						protocol = %self.protocol,
387						"Discard message from peer",
388					);
389					continue;
390				},
391			};
392
393			let peer = match self.peers.get_mut(&who) {
394				Some(peer) => peer,
395				None => {
396					tracing::error!(
397						target: "gossip",
398						%who,
399						protocol = %self.protocol,
400						"Got message from unregistered peer",
401					);
402					continue;
403				},
404			};
405
406			network.report_peer(who, rep::GOSSIP_SUCCESS);
407			peer.known_messages.insert(message_hash);
408			to_forward
409				.push((topic, TopicNotification { message: message.clone(), sender: Some(who) }));
410
411			if keep {
412				self.register_message_hashed(message_hash, topic, message, Some(who));
413			}
414		}
415
416		to_forward
417	}
418
419	/// Send all messages with given topic to a peer.
420	pub fn send_topic(
421		&mut self,
422		notification_service: &mut Box<dyn NotificationService>,
423		who: &PeerId,
424		topic: B::Hash,
425		force: bool,
426	) {
427		let mut message_allowed = self.validator.message_allowed();
428
429		if let Some(ref mut peer) = self.peers.get_mut(who) {
430			for entry in self.messages.iter().filter(|m| m.topic == topic) {
431				let intent =
432					if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
433
434				if !force && peer.known_messages.contains(&entry.message_hash) {
435					continue;
436				}
437
438				if !message_allowed(who, intent, &entry.topic, &entry.message) {
439					continue;
440				}
441
442				peer.known_messages.insert(entry.message_hash);
443
444				tracing::trace!(
445					target: "gossip",
446					to = %who,
447					protocol = %self.protocol,
448					?entry.message,
449					"Sending topic message",
450				);
451				notification_service.send_sync_notification(who, entry.message.clone());
452			}
453		}
454	}
455
456	/// Multicast a message to all peers.
457	pub fn multicast(
458		&mut self,
459		notification_service: &mut Box<dyn NotificationService>,
460		topic: B::Hash,
461		message: Vec<u8>,
462		force: bool,
463	) {
464		let message_hash = HashingFor::<B>::hash(&message);
465		self.register_message_hashed(message_hash, topic, message.clone(), None);
466		let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
467		propagate(
468			notification_service,
469			self.protocol.clone(),
470			iter::once((&message_hash, &topic, &message)),
471			intent,
472			&mut self.peers,
473			&self.validator,
474		);
475	}
476
477	/// Send addressed message to a peer. The message is not kept or multicast
478	/// later on.
479	pub fn send_message(
480		&mut self,
481		notification_service: &mut Box<dyn NotificationService>,
482		who: &PeerId,
483		message: Vec<u8>,
484	) {
485		let peer = match self.peers.get_mut(who) {
486			None => return,
487			Some(peer) => peer,
488		};
489
490		let message_hash = HashingFor::<B>::hash(&message);
491
492		tracing::trace!(
493			target: "gossip",
494			to = %who,
495			protocol = %self.protocol,
496			?message,
497			"Sending direct message",
498		);
499
500		peer.known_messages.insert(message_hash);
501		notification_service.send_sync_notification(who, message)
502	}
503}
504
505struct Metrics {
506	registered_messages: Counter<U64>,
507	expired_messages: Counter<U64>,
508}
509
510impl Metrics {
511	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
512		Ok(Self {
513			registered_messages: register(
514				Counter::new(
515					"substrate_network_gossip_registered_messages_total",
516					"Number of registered messages by the gossip service.",
517				)?,
518				registry,
519			)?,
520			expired_messages: register(
521				Counter::new(
522					"substrate_network_gossip_expired_messages_total",
523					"Number of expired messages by the gossip service.",
524				)?,
525				registry,
526			)?,
527		})
528	}
529}
530
531#[cfg(test)]
532mod tests {
533	use super::*;
534	use futures::prelude::*;
535	use soil_network::types::multiaddr::Multiaddr;
536	use soil_network::{
537		config::MultiaddrWithPeerId, event::Event, service::traits::NotificationEvent, MessageSink,
538		NetworkBlock, NetworkEventStream, NetworkPeers, ReputationChange,
539	};
540	use std::{
541		collections::HashSet,
542		pin::Pin,
543		sync::{Arc, Mutex},
544	};
545	use subsoil::runtime::{
546		testing::{Block as RawBlock, MockCallU64, TestXt, H256},
547		traits::NumberFor,
548	};
549
550	type Block = RawBlock<TestXt<MockCallU64, ()>>;
551
552	macro_rules! push_msg {
553		($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
554			if $consensus.known_messages.insert($hash, ()) {
555				$consensus.messages.push(MessageEntry {
556					message_hash: $hash,
557					topic: $topic,
558					message: $m,
559					sender: None,
560				});
561			}
562		};
563	}
564
565	struct AllowAll;
566	impl Validator<Block> for AllowAll {
567		fn validate(
568			&self,
569			_context: &mut dyn ValidatorContext<Block>,
570			_sender: &PeerId,
571			_data: &[u8],
572		) -> ValidationResult<H256> {
573			ValidationResult::ProcessAndKeep(H256::default())
574		}
575	}
576
577	struct DiscardAll;
578	impl Validator<Block> for DiscardAll {
579		fn validate(
580			&self,
581			_context: &mut dyn ValidatorContext<Block>,
582			_sender: &PeerId,
583			_data: &[u8],
584		) -> ValidationResult<H256> {
585			ValidationResult::Discard
586		}
587	}
588
589	#[derive(Clone, Default)]
590	struct NoOpNetwork {
591		inner: Arc<Mutex<NoOpNetworkInner>>,
592	}
593
594	#[derive(Clone, Default)]
595	struct NoOpNetworkInner {
596		peer_reports: Vec<(PeerId, ReputationChange)>,
597	}
598
599	#[async_trait::async_trait]
600	impl NetworkPeers for NoOpNetwork {
601		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
602			unimplemented!();
603		}
604
605		fn set_authorized_only(&self, _reserved_only: bool) {
606			unimplemented!();
607		}
608
609		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
610			unimplemented!();
611		}
612
613		fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
614			self.inner.lock().unwrap().peer_reports.push((peer_id, cost_benefit));
615		}
616
617		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
618			unimplemented!()
619		}
620
621		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
622			unimplemented!();
623		}
624
625		fn accept_unreserved_peers(&self) {
626			unimplemented!();
627		}
628
629		fn deny_unreserved_peers(&self) {
630			unimplemented!();
631		}
632
633		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
634			unimplemented!();
635		}
636
637		fn remove_reserved_peer(&self, _peer_id: PeerId) {
638			unimplemented!();
639		}
640
641		fn set_reserved_peers(
642			&self,
643			_protocol: ProtocolName,
644			_peers: HashSet<Multiaddr>,
645		) -> Result<(), String> {
646			unimplemented!();
647		}
648
649		fn add_peers_to_reserved_set(
650			&self,
651			_protocol: ProtocolName,
652			_peers: HashSet<Multiaddr>,
653		) -> Result<(), String> {
654			unimplemented!();
655		}
656
657		fn remove_peers_from_reserved_set(
658			&self,
659			_protocol: ProtocolName,
660			_peers: Vec<PeerId>,
661		) -> Result<(), String> {
662			unimplemented!();
663		}
664
665		fn sync_num_connected(&self) -> usize {
666			unimplemented!();
667		}
668
669		fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
670			None
671		}
672
673		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
674			unimplemented!();
675		}
676	}
677
678	impl NetworkEventStream for NoOpNetwork {
679		fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
680			unimplemented!();
681		}
682	}
683
684	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
685		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
686			unimplemented!();
687		}
688
689		fn new_best_block_imported(
690			&self,
691			_hash: <Block as BlockT>::Hash,
692			_number: NumberFor<Block>,
693		) {
694			unimplemented!();
695		}
696	}
697
698	#[derive(Debug, Default)]
699	struct NoOpNotificationService {}
700
701	#[async_trait::async_trait]
702	impl NotificationService for NoOpNotificationService {
703		/// Instruct `Notifications` to open a new substream for `peer`.
704		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
705			unimplemented!();
706		}
707
708		/// Instruct `Notifications` to close substream for `peer`.
709		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
710			unimplemented!();
711		}
712
713		/// Send synchronous `notification` to `peer`.
714		fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
715			unimplemented!();
716		}
717
718		/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
719		async fn send_async_notification(
720			&mut self,
721			_peer: &PeerId,
722			_notification: Vec<u8>,
723		) -> Result<(), soil_network::error::Error> {
724			unimplemented!();
725		}
726
727		/// Set handshake for the notification protocol replacing the old handshake.
728		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
729			unimplemented!();
730		}
731
732		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
733			unimplemented!();
734		}
735
736		/// Get next event from the `Notifications` event stream.
737		async fn next_event(&mut self) -> Option<NotificationEvent> {
738			None
739		}
740
741		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
742			unimplemented!();
743		}
744
745		fn protocol(&self) -> &ProtocolName {
746			unimplemented!();
747		}
748
749		fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
750			unimplemented!();
751		}
752	}
753
754	#[test]
755	fn collects_garbage() {
756		struct AllowOne;
757		impl Validator<Block> for AllowOne {
758			fn validate(
759				&self,
760				_context: &mut dyn ValidatorContext<Block>,
761				_sender: &PeerId,
762				data: &[u8],
763			) -> ValidationResult<H256> {
764				if data[0] == 1 {
765					ValidationResult::ProcessAndKeep(H256::default())
766				} else {
767					ValidationResult::Discard
768				}
769			}
770
771			fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
772				Box::new(move |_topic, data| data[0] != 1)
773			}
774		}
775
776		let prev_hash = H256::random();
777		let best_hash = H256::random();
778		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
779		let m1_hash = H256::random();
780		let m2_hash = H256::random();
781		let m1 = vec![1, 2, 3];
782		let m2 = vec![4, 5, 6];
783
784		push_msg!(consensus, prev_hash, m1_hash, m1);
785		push_msg!(consensus, best_hash, m2_hash, m2);
786		consensus.known_messages.insert(m1_hash, ());
787		consensus.known_messages.insert(m2_hash, ());
788
789		consensus.collect_garbage();
790		assert_eq!(consensus.messages.len(), 2);
791		assert_eq!(consensus.known_messages.len(), 2);
792
793		consensus.validator = Arc::new(AllowOne);
794
795		// m2 is expired
796		consensus.collect_garbage();
797		assert_eq!(consensus.messages.len(), 1);
798		// known messages are only pruned based on size.
799		assert_eq!(consensus.known_messages.len(), 2);
800		assert!(consensus.known_messages.get(&m2_hash).is_some());
801	}
802
803	#[test]
804	fn message_stream_include_those_sent_before_asking() {
805		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
806
807		// Register message.
808		let message = vec![4, 5, 6];
809		let topic = HashingFor::<Block>::hash(&[1, 2, 3]);
810		consensus.register_message(topic, message.clone());
811
812		assert_eq!(
813			consensus.messages_for(topic).next(),
814			Some(TopicNotification { message, sender: None }),
815		);
816	}
817
818	#[test]
819	fn can_keep_multiple_messages_per_topic() {
820		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
821
822		let topic = [1; 32].into();
823		let msg_a = vec![1, 2, 3];
824		let msg_b = vec![4, 5, 6];
825
826		consensus.register_message(topic, msg_a);
827		consensus.register_message(topic, msg_b);
828
829		assert_eq!(consensus.messages.len(), 2);
830	}
831
832	#[test]
833	fn peer_is_removed_on_disconnect() {
834		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
835
836		let mut notification_service: Box<dyn NotificationService> =
837			Box::new(NoOpNotificationService::default());
838
839		let peer_id = PeerId::random();
840		consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
841		assert!(consensus.peers.contains_key(&peer_id));
842
843		consensus.peer_disconnected(&mut notification_service, peer_id);
844		assert!(!consensus.peers.contains_key(&peer_id));
845	}
846
847	#[test]
848	fn on_incoming_ignores_discarded_messages() {
849		let mut notification_service: Box<dyn NotificationService> =
850			Box::new(NoOpNotificationService::default());
851		let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
852			.on_incoming(
853				&mut NoOpNetwork::default(),
854				&mut notification_service,
855				PeerId::random(),
856				vec![vec![1, 2, 3]],
857			);
858
859		assert!(
860			to_forward.is_empty(),
861			"Expected `on_incoming` to ignore discarded message but got {:?}",
862			to_forward,
863		);
864	}
865
866	#[test]
867	fn on_incoming_ignores_unregistered_peer() {
868		let mut network = NoOpNetwork::default();
869		let mut notification_service: Box<dyn NotificationService> =
870			Box::new(NoOpNotificationService::default());
871		let remote = PeerId::random();
872
873		let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
874			.on_incoming(
875				&mut network,
876				&mut notification_service,
877				// Unregistered peer.
878				remote,
879				vec![vec![1, 2, 3]],
880			);
881
882		assert!(
883			to_forward.is_empty(),
884			"Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
885			to_forward,
886		);
887	}
888
889	// Two peers can send us the same gossip message. We should not report the second peer
890	// sending the gossip message as long as its the first time the peer send us this message.
891	#[test]
892	fn do_not_report_peer_for_first_time_duplicate_gossip_message() {
893		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
894
895		let mut network = NoOpNetwork::default();
896		let mut notification_service: Box<dyn NotificationService> =
897			Box::new(NoOpNotificationService::default());
898
899		let peer_id = PeerId::random();
900		consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
901		assert!(consensus.peers.contains_key(&peer_id));
902
903		let peer_id2 = PeerId::random();
904		consensus.new_peer(&mut notification_service, peer_id2, ObservedRole::Full);
905		assert!(consensus.peers.contains_key(&peer_id2));
906
907		let message = vec![vec![1, 2, 3]];
908		consensus.on_incoming(&mut network, &mut notification_service, peer_id, message.clone());
909		consensus.on_incoming(&mut network, &mut notification_service, peer_id2, message.clone());
910
911		assert_eq!(
912			vec![(peer_id, rep::GOSSIP_SUCCESS)],
913			network.inner.lock().unwrap().peer_reports
914		);
915	}
916}