Skip to main content

soil_network/gossip/
bridge.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::gossip::{
8	state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL},
9	Network, Syncing, Validator,
10};
11
12use soil_network::sync::SyncEvent;
13use soil_network::{
14	service::traits::{NotificationEvent, ValidationResult},
15	types::ProtocolName,
16	NotificationService, ReputationChange,
17};
18
19use futures::{
20	channel::mpsc::{channel, Receiver, Sender},
21	prelude::*,
22};
23use log::trace;
24use soil_prometheus::Registry;
25use soil_network::types::PeerId;
26use std::{
27	collections::{HashMap, VecDeque},
28	pin::Pin,
29	sync::Arc,
30	task::{Context, Poll},
31};
32use subsoil::runtime::traits::Block as BlockT;
33
34/// Wraps around an implementation of the [`Network`] trait and provides gossiping capabilities on
35/// top of it.
36pub struct GossipEngine<B: BlockT> {
37	state_machine: ConsensusGossip<B>,
38	network: Box<dyn Network<B> + Send>,
39	sync: Box<dyn Syncing<B>>,
40	periodic_maintenance_interval: futures_timer::Delay,
41	protocol: ProtocolName,
42
43	/// Incoming events from the syncing service.
44	sync_event_stream: Pin<Box<dyn Stream<Item = SyncEvent> + Send>>,
45	/// Handle for polling notification-related events.
46	notification_service: Box<dyn NotificationService>,
47	/// Outgoing events to the consumer.
48	message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
49	/// Buffered messages (see [`ForwardingState`]).
50	forwarding_state: ForwardingState<B>,
51
52	is_terminated: bool,
53}
54
55/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
56/// them to upper layers via the `message_sinks`. In the scenario where messages have been received
57/// from the network but a subscribed message sink is not yet ready to receive the messages, the
58/// messages are buffered. To model this process a gossip engine can be in two states.
59enum ForwardingState<B: BlockT> {
60	/// The gossip engine is currently not forwarding any messages and will poll the network for
61	/// more messages to forward.
62	Idle,
63	/// The gossip engine is in the progress of forwarding messages and thus will not poll the
64	/// network for more messages until it has send all current messages into the subscribed
65	/// message sinks.
66	Busy(VecDeque<(B::Hash, TopicNotification)>),
67}
68
69impl<B: BlockT> Unpin for GossipEngine<B> {}
70
71impl<B: BlockT> GossipEngine<B> {
72	/// Create a new instance.
73	pub fn new<N, S>(
74		network: N,
75		sync: S,
76		notification_service: Box<dyn NotificationService>,
77		protocol: impl Into<ProtocolName>,
78		validator: Arc<dyn Validator<B>>,
79		metrics_registry: Option<&Registry>,
80	) -> Self
81	where
82		B: 'static,
83		N: Network<B> + Send + Clone + 'static,
84		S: Syncing<B> + Send + Clone + 'static,
85	{
86		let protocol = protocol.into();
87		let sync_event_stream = sync.event_stream("network-gossip");
88
89		GossipEngine {
90			state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
91			network: Box::new(network),
92			sync: Box::new(sync),
93			notification_service,
94			periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
95			protocol,
96
97			sync_event_stream,
98			message_sinks: HashMap::new(),
99			forwarding_state: ForwardingState::Idle,
100
101			is_terminated: false,
102		}
103	}
104
105	pub fn report(&self, who: PeerId, reputation: ReputationChange) {
106		self.network.report_peer(who, reputation);
107	}
108
109	/// Registers a message without propagating it to any peers. The message
110	/// becomes available to new peers or when the service is asked to gossip
111	/// the message's topic. No validation is performed on the message, if the
112	/// message is already expired it should be dropped on the next garbage
113	/// collection.
114	pub fn register_gossip_message(&mut self, topic: B::Hash, message: Vec<u8>) {
115		self.state_machine.register_message(topic, message);
116	}
117
118	/// Broadcast all messages with given topic.
119	pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
120		self.state_machine.broadcast_topic(&mut self.notification_service, topic, force);
121	}
122
123	/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
124	pub fn messages_for(&mut self, topic: B::Hash) -> Receiver<TopicNotification> {
125		let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
126		// The channel length is not critical for correctness. By the implementation of `channel`
127		// each sender is guaranteed a single buffer slot, making it a non-rendezvous channel and
128		// thus preventing direct dead-locks. A minimum channel length of 10 is an estimate based on
129		// the fact that despite `NotificationsReceived` having a `Vec` of messages, it only ever
130		// contains a single message.
131		let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
132
133		for notification in past_messages {
134			tx.try_send(notification)
135				.expect("receiver known to be live, and buffer size known to suffice; qed");
136		}
137
138		self.message_sinks.entry(topic).or_default().push(tx);
139
140		rx
141	}
142
143	/// Send all messages with given topic to a peer.
144	pub fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
145		self.state_machine.send_topic(&mut self.notification_service, who, topic, force)
146	}
147
148	/// Multicast a message to all peers.
149	pub fn gossip_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
150		self.state_machine
151			.multicast(&mut self.notification_service, topic, message, force)
152	}
153
154	/// Send addressed message to the given peers. The message is not kept or multicast
155	/// later on.
156	pub fn send_message(&mut self, who: Vec<PeerId>, data: Vec<u8>) {
157		for who in &who {
158			self.state_machine
159				.send_message(&mut self.notification_service, who, data.clone());
160		}
161	}
162
163	/// Notify everyone we're connected to that we have the given block.
164	///
165	/// Note: this method isn't strictly related to gossiping and should eventually be moved
166	/// somewhere else.
167	pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
168		self.sync.announce_block(block, associated_data);
169	}
170
171	/// Consume [`GossipEngine`] and return the notification service.
172	pub fn take_notification_service(self) -> Box<dyn NotificationService> {
173		self.notification_service
174	}
175}
176
177impl<B: BlockT> Future for GossipEngine<B> {
178	type Output = ();
179
180	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
181		let this = &mut *self;
182
183		'outer: loop {
184			match &mut this.forwarding_state {
185				ForwardingState::Idle => {
186					let next_notification_event =
187						this.notification_service.next_event().poll_unpin(cx);
188					let sync_event_stream = this.sync_event_stream.poll_next_unpin(cx);
189
190					if next_notification_event.is_pending() && sync_event_stream.is_pending() {
191						break;
192					}
193
194					match next_notification_event {
195						Poll::Ready(Some(event)) => match event {
196							NotificationEvent::ValidateInboundSubstream {
197								peer,
198								handshake,
199								result_tx,
200								..
201							} => {
202								// only accept peers whose role can be determined
203								let result = this
204									.network
205									.peer_role(peer, handshake)
206									.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
207								let _ = result_tx.send(result);
208							},
209							NotificationEvent::NotificationStreamOpened {
210								peer, handshake, ..
211							} => {
212								if let Some(role) = this.network.peer_role(peer, handshake) {
213									this.state_machine.new_peer(
214										&mut this.notification_service,
215										peer,
216										role,
217									);
218								} else {
219									log::debug!(target: "gossip", "role for {peer} couldn't be determined");
220								}
221							},
222							NotificationEvent::NotificationStreamClosed { peer } => {
223								this.state_machine
224									.peer_disconnected(&mut this.notification_service, peer);
225							},
226							NotificationEvent::NotificationReceived { peer, notification } => {
227								let to_forward = this.state_machine.on_incoming(
228									&mut *this.network,
229									&mut this.notification_service,
230									peer,
231									vec![notification],
232								);
233								this.forwarding_state = ForwardingState::Busy(to_forward.into());
234							},
235						},
236						// The network event stream closed. Do the same for [`GossipValidator`].
237						Poll::Ready(None) => {
238							self.is_terminated = true;
239							return Poll::Ready(());
240						},
241						Poll::Pending => {},
242					}
243
244					match sync_event_stream {
245						Poll::Ready(Some(event)) => match event {
246							SyncEvent::PeerConnected(remote) => {
247								this.network.add_set_reserved(remote, this.protocol.clone())
248							},
249							SyncEvent::PeerDisconnected(remote) => {
250								this.network.remove_set_reserved(remote, this.protocol.clone())
251							},
252						},
253						// The sync event stream closed. Do the same for [`GossipValidator`].
254						Poll::Ready(None) => {
255							self.is_terminated = true;
256							return Poll::Ready(());
257						},
258						Poll::Pending => {},
259					}
260				},
261				ForwardingState::Busy(to_forward) => {
262					let (topic, notification) = match to_forward.pop_front() {
263						Some(n) => n,
264						None => {
265							this.forwarding_state = ForwardingState::Idle;
266							continue;
267						},
268					};
269
270					let sinks = match this.message_sinks.get_mut(&topic) {
271						Some(sinks) => sinks,
272						None => continue,
273					};
274
275					// Make sure all sinks for the given topic are ready.
276					for sink in sinks.iter_mut() {
277						match sink.poll_ready(cx) {
278							Poll::Ready(Ok(())) => {},
279							// Receiver has been dropped. Ignore for now, filtered out in (1).
280							Poll::Ready(Err(_)) => {},
281							Poll::Pending => {
282								// Push back onto queue for later.
283								to_forward.push_front((topic, notification));
284								break 'outer;
285							},
286						}
287					}
288
289					// Filter out all closed sinks.
290					sinks.retain(|sink| !sink.is_closed()); // (1)
291
292					if sinks.is_empty() {
293						this.message_sinks.remove(&topic);
294						continue;
295					}
296
297					trace!(
298						target: "gossip",
299						"Pushing consensus message to sinks for {}.", topic,
300					);
301
302					// Send the notification on each sink.
303					for sink in sinks {
304						match sink.start_send(notification.clone()) {
305							Ok(()) => {},
306							Err(e) if e.is_full() => {
307								unreachable!("Previously ensured that all sinks are ready; qed.")
308							},
309							// Receiver got dropped. Will be removed in next iteration (See (1)).
310							Err(_) => {},
311						}
312					}
313				},
314			}
315		}
316
317		while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
318			this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
319			this.state_machine.tick(&mut this.notification_service);
320
321			this.message_sinks.retain(|_, sinks| {
322				sinks.retain(|sink| !sink.is_closed());
323				!sinks.is_empty()
324			});
325		}
326
327		Poll::Pending
328	}
329}
330
331impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
332	fn is_terminated(&self) -> bool {
333		self.is_terminated
334	}
335}
336
337#[cfg(test)]
338mod tests {
339	use super::*;
340	use crate::gossip::{ValidationResult, ValidatorContext};
341	use codec::{DecodeAll, Encode};
342	use futures::{
343		channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
344		executor::{block_on, block_on_stream},
345		future::poll_fn,
346	};
347	use quickcheck::{Arbitrary, Gen, QuickCheck};
348	use soil_network::common::role::ObservedRole;
349	use soil_network::sync::SyncEventStream;
350	use soil_network::types::multiaddr::Multiaddr;
351	use soil_network::{
352		config::MultiaddrWithPeerId,
353		service::traits::{Direction, MessageSink, NotificationEvent},
354		Event, NetworkBlock, NetworkEventStream, NetworkPeers, NotificationService, Roles,
355	};
356	use std::{
357		collections::HashSet,
358		sync::{Arc, Mutex},
359	};
360	use subsoil::runtime::{
361		testing::H256,
362		traits::{Block as BlockT, NumberFor},
363	};
364	use soil_test_node_runtime_client::runtime::Block;
365
366	#[derive(Clone, Default)]
367	struct TestNetwork {}
368
369	#[async_trait::async_trait]
370	impl NetworkPeers for TestNetwork {
371		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
372			unimplemented!();
373		}
374
375		fn set_authorized_only(&self, _reserved_only: bool) {
376			unimplemented!();
377		}
378
379		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
380			unimplemented!();
381		}
382
383		fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {}
384
385		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
386			unimplemented!()
387		}
388
389		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
390			unimplemented!();
391		}
392
393		fn accept_unreserved_peers(&self) {
394			unimplemented!();
395		}
396
397		fn deny_unreserved_peers(&self) {
398			unimplemented!();
399		}
400
401		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
402			unimplemented!();
403		}
404
405		fn remove_reserved_peer(&self, _peer_id: PeerId) {
406			unimplemented!();
407		}
408
409		fn set_reserved_peers(
410			&self,
411			_protocol: ProtocolName,
412			_peers: HashSet<Multiaddr>,
413		) -> Result<(), String> {
414			unimplemented!();
415		}
416
417		fn add_peers_to_reserved_set(
418			&self,
419			_protocol: ProtocolName,
420			_peers: HashSet<Multiaddr>,
421		) -> Result<(), String> {
422			unimplemented!();
423		}
424
425		fn remove_peers_from_reserved_set(
426			&self,
427			_protocol: ProtocolName,
428			_peers: Vec<PeerId>,
429		) -> Result<(), String> {
430			unimplemented!();
431		}
432
433		fn sync_num_connected(&self) -> usize {
434			unimplemented!();
435		}
436
437		fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
438			Roles::decode_all(&mut &handshake[..])
439				.ok()
440				.and_then(|role| Some(ObservedRole::from(role)))
441		}
442
443		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
444			unimplemented!();
445		}
446	}
447
448	impl NetworkEventStream for TestNetwork {
449		fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
450			unimplemented!();
451		}
452	}
453
454	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
455		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
456			unimplemented!();
457		}
458
459		fn new_best_block_imported(
460			&self,
461			_hash: <Block as BlockT>::Hash,
462			_number: NumberFor<Block>,
463		) {
464			unimplemented!();
465		}
466	}
467
468	#[derive(Clone, Default)]
469	struct TestSync {
470		inner: Arc<Mutex<TestSyncInner>>,
471	}
472
473	#[derive(Clone, Default)]
474	struct TestSyncInner {
475		event_senders: Vec<UnboundedSender<SyncEvent>>,
476	}
477
478	impl SyncEventStream for TestSync {
479		fn event_stream(
480			&self,
481			_name: &'static str,
482		) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
483			let (tx, rx) = unbounded();
484			self.inner.lock().unwrap().event_senders.push(tx);
485
486			Box::pin(rx)
487		}
488	}
489
490	impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestSync {
491		fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
492			unimplemented!();
493		}
494
495		fn new_best_block_imported(
496			&self,
497			_hash: <Block as BlockT>::Hash,
498			_number: NumberFor<Block>,
499		) {
500			unimplemented!();
501		}
502	}
503
504	#[derive(Debug)]
505	pub(crate) struct TestNotificationService {
506		rx: UnboundedReceiver<NotificationEvent>,
507	}
508
509	#[async_trait::async_trait]
510	impl soil_network::service::traits::NotificationService for TestNotificationService {
511		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
512			unimplemented!();
513		}
514
515		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
516			unimplemented!();
517		}
518
519		fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
520			unimplemented!();
521		}
522
523		async fn send_async_notification(
524			&mut self,
525			_peer: &PeerId,
526			_notification: Vec<u8>,
527		) -> Result<(), soil_network::error::Error> {
528			unimplemented!();
529		}
530
531		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
532			unimplemented!();
533		}
534
535		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
536			unimplemented!();
537		}
538
539		async fn next_event(&mut self) -> Option<NotificationEvent> {
540			self.rx.next().await
541		}
542
543		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
544			unimplemented!();
545		}
546
547		fn protocol(&self) -> &ProtocolName {
548			unimplemented!();
549		}
550
551		fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
552			unimplemented!();
553		}
554	}
555
556	struct AllowAll;
557	impl Validator<Block> for AllowAll {
558		fn validate(
559			&self,
560			_context: &mut dyn ValidatorContext<Block>,
561			_sender: &PeerId,
562			_data: &[u8],
563		) -> ValidationResult<H256> {
564			ValidationResult::ProcessAndKeep(H256::default())
565		}
566	}
567
568	/// Regression test for the case where the `GossipEngine.network_event_stream` closes. One
569	/// should not ignore a `Poll::Ready(None)` as `poll_next_unpin` will panic on subsequent calls.
570	///
571	/// See https://github.com/paritytech/substrate/issues/5000 for details.
572	#[test]
573	fn returns_when_network_event_stream_closes() {
574		let network = TestNetwork::default();
575		let sync = Arc::new(TestSync::default());
576		let (tx, rx) = unbounded();
577		let notification_service = Box::new(TestNotificationService { rx });
578		let mut gossip_engine = GossipEngine::<Block>::new(
579			network.clone(),
580			sync,
581			notification_service,
582			"/my_protocol",
583			Arc::new(AllowAll {}),
584			None,
585		);
586
587		// drop notification service sender side.
588		drop(tx);
589
590		block_on(poll_fn(move |ctx| {
591			if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
592				panic!(
593					"Expected gossip engine to finish on first poll, given that \
594					 `GossipEngine.network_event_stream` closes right away."
595				)
596			}
597			Poll::Ready(())
598		}))
599	}
600
601	#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
602	async fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
603		let topic = H256::default();
604		let protocol = ProtocolName::from("/my_protocol");
605		let remote_peer = PeerId::random();
606		let network = TestNetwork::default();
607		let sync = Arc::new(TestSync::default());
608		let (mut tx, rx) = unbounded();
609		let notification_service = Box::new(TestNotificationService { rx });
610
611		let mut gossip_engine = GossipEngine::<Block>::new(
612			network.clone(),
613			sync.clone(),
614			notification_service,
615			protocol.clone(),
616			Arc::new(AllowAll {}),
617			None,
618		);
619
620		// Register the remote peer.
621		tx.send(NotificationEvent::NotificationStreamOpened {
622			peer: remote_peer,
623			direction: Direction::Inbound,
624			negotiated_fallback: None,
625			handshake: Roles::FULL.encode(),
626		})
627		.await
628		.unwrap();
629
630		let messages = vec![vec![1], vec![2]];
631
632		// Send first event before subscribing.
633		tx.send(NotificationEvent::NotificationReceived {
634			peer: remote_peer,
635			notification: messages[0].clone().into(),
636		})
637		.await
638		.unwrap();
639
640		let mut subscribers = vec![];
641		for _ in 0..2 {
642			subscribers.push(gossip_engine.messages_for(topic));
643		}
644
645		// Send second event after subscribing.
646		tx.send(NotificationEvent::NotificationReceived {
647			peer: remote_peer,
648			notification: messages[1].clone().into(),
649		})
650		.await
651		.unwrap();
652
653		tokio::spawn(gossip_engine);
654
655		// Note: `block_on_stream()`-derived iterator will block the current thread,
656		//       so we need a `multi_thread` `tokio::test` runtime flavor.
657		let mut subscribers =
658			subscribers.into_iter().map(|s| block_on_stream(s)).collect::<Vec<_>>();
659
660		// Expect each subscriber to receive both events.
661		for message in messages {
662			for subscriber in subscribers.iter_mut() {
663				assert_eq!(
664					subscriber.next(),
665					Some(TopicNotification { message: message.clone(), sender: Some(remote_peer) }),
666				);
667			}
668		}
669	}
670
671	#[test]
672	fn forwarding_to_different_size_and_topic_channels() {
673		#[derive(Clone, Debug)]
674		struct ChannelLengthAndTopic {
675			length: usize,
676			topic: H256,
677		}
678
679		impl Arbitrary for ChannelLengthAndTopic {
680			fn arbitrary(g: &mut Gen) -> Self {
681				let possible_length = (0..100).collect::<Vec<usize>>();
682				let possible_topics = (0..10).collect::<Vec<u64>>();
683				Self {
684					length: *g.choose(&possible_length).unwrap(),
685					// Make sure channel topics and message topics overlap by choosing a small
686					// range.
687					topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
688				}
689			}
690		}
691
692		#[derive(Clone, Debug)]
693		struct Message {
694			topic: H256,
695		}
696
697		impl Arbitrary for Message {
698			fn arbitrary(g: &mut Gen) -> Self {
699				let possible_topics = (0..10).collect::<Vec<u64>>();
700				Self {
701					// Make sure channel topics and message topics overlap by choosing a small
702					// range.
703					topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
704				}
705			}
706		}
707
708		/// Validator that always returns `ProcessAndKeep` interpreting the first 32 bytes of data
709		/// as the message topic.
710		struct TestValidator;
711
712		impl Validator<Block> for TestValidator {
713			fn validate(
714				&self,
715				_context: &mut dyn ValidatorContext<Block>,
716				_sender: &PeerId,
717				data: &[u8],
718			) -> ValidationResult<H256> {
719				ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
720			}
721		}
722
723		fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
724			let protocol = ProtocolName::from("/my_protocol");
725			let remote_peer = PeerId::random();
726			let network = TestNetwork::default();
727			let sync = Arc::new(TestSync::default());
728			let (mut tx, rx) = unbounded();
729			let notification_service = Box::new(TestNotificationService { rx });
730
731			let num_channels_per_topic = channels.iter().fold(
732				HashMap::new(),
733				|mut acc, ChannelLengthAndTopic { topic, .. }| {
734					acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
735					acc
736				},
737			);
738
739			let expected_msgs_per_topic_all_chan = notifications
740				.iter()
741				.fold(HashMap::new(), |mut acc, messages| {
742					for message in messages {
743						acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
744					}
745					acc
746				})
747				.into_iter()
748				// Messages are cloned for each channel with the corresponding topic, thus multiply
749				// with the amount of channels per topic. If there is no channel for a given topic,
750				// don't expect any messages for the topic to be received.
751				.map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
752				.collect::<HashMap<H256, _>>();
753
754			let mut gossip_engine = GossipEngine::<Block>::new(
755				network.clone(),
756				sync.clone(),
757				notification_service,
758				protocol.clone(),
759				Arc::new(TestValidator {}),
760				None,
761			);
762
763			// Create channels.
764			let (txs, mut rxs) = channels
765				.iter()
766				.map(|ChannelLengthAndTopic { length, topic }| (*topic, channel(*length)))
767				.fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
768					acc.0.push((topic, tx));
769					acc.1.push((topic, rx));
770					acc
771				});
772
773			// Insert sender sides into `gossip_engine`.
774			for (topic, tx) in txs {
775				match gossip_engine.message_sinks.get_mut(&topic) {
776					Some(entry) => entry.push(tx),
777					None => {
778						gossip_engine.message_sinks.insert(topic, vec![tx]);
779					},
780				}
781			}
782
783			// Register the remote peer.
784			tx.start_send(NotificationEvent::NotificationStreamOpened {
785				peer: remote_peer,
786				direction: Direction::Inbound,
787				negotiated_fallback: None,
788				handshake: Roles::FULL.encode(),
789			})
790			.unwrap();
791
792			// Send messages into the network event stream.
793			for (i_notification, messages) in notifications.iter().enumerate() {
794				let messages: Vec<Vec<u8>> = messages
795					.into_iter()
796					.enumerate()
797					.map(|(i_message, Message { topic })| {
798						// Embed the topic in the first 256 bytes of the message to be extracted by
799						// the [`TestValidator`] later on.
800						let mut message = topic.as_bytes().to_vec();
801
802						// Make sure the message is unique via `i_notification` and `i_message` to
803						// ensure [`ConsensusBridge`] does not deduplicate it.
804						message.push(i_notification.try_into().unwrap());
805						message.push(i_message.try_into().unwrap());
806
807						message.into()
808					})
809					.collect();
810
811				for message in messages {
812					tx.start_send(NotificationEvent::NotificationReceived {
813						peer: remote_peer,
814						notification: message,
815					})
816					.unwrap();
817				}
818			}
819
820			let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
821
822			// Poll both gossip engine and each receiver and track the amount of received messages.
823			block_on(poll_fn(|cx| {
824				loop {
825					if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
826						unreachable!(
827							"Event stream sender side is not dropped, thus gossip engine does not \
828							 terminate",
829						);
830					}
831
832					let mut progress = false;
833
834					for (topic, rx) in rxs.iter_mut() {
835						match rx.poll_next_unpin(cx) {
836							Poll::Ready(Some(_)) => {
837								progress = true;
838								received_msgs_per_topic_all_chan
839									.entry(*topic)
840									.and_modify(|e| *e += 1)
841									.or_insert(1);
842							},
843							Poll::Ready(None) => {
844								unreachable!("Sender side of channel is never dropped")
845							},
846							Poll::Pending => {},
847						}
848					}
849
850					if !progress {
851						break;
852					}
853				}
854				Poll::Ready(())
855			}));
856
857			// Compare amount of expected messages with amount of received messages.
858			for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
859				assert_eq!(
860					received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
861					expected_num,
862				);
863			}
864			for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
865				assert_eq!(
866					expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
867					received_num,
868				);
869			}
870		}
871
872		// Past regressions.
873		prop(vec![], vec![vec![Message { topic: H256::default() }]]);
874		prop(
875			vec![ChannelLengthAndTopic { length: 71, topic: H256::default() }],
876			vec![vec![Message { topic: H256::default() }]],
877		);
878
879		QuickCheck::new().quickcheck(prop as fn(_, _))
880	}
881}