1use crate::gossip::{
8 state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL},
9 Network, Syncing, Validator,
10};
11
12use soil_network::sync::SyncEvent;
13use soil_network::{
14 service::traits::{NotificationEvent, ValidationResult},
15 types::ProtocolName,
16 NotificationService, ReputationChange,
17};
18
19use futures::{
20 channel::mpsc::{channel, Receiver, Sender},
21 prelude::*,
22};
23use log::trace;
24use soil_prometheus::Registry;
25use soil_network::types::PeerId;
26use std::{
27 collections::{HashMap, VecDeque},
28 pin::Pin,
29 sync::Arc,
30 task::{Context, Poll},
31};
32use subsoil::runtime::traits::Block as BlockT;
33
34pub struct GossipEngine<B: BlockT> {
37 state_machine: ConsensusGossip<B>,
38 network: Box<dyn Network<B> + Send>,
39 sync: Box<dyn Syncing<B>>,
40 periodic_maintenance_interval: futures_timer::Delay,
41 protocol: ProtocolName,
42
43 sync_event_stream: Pin<Box<dyn Stream<Item = SyncEvent> + Send>>,
45 notification_service: Box<dyn NotificationService>,
47 message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
49 forwarding_state: ForwardingState<B>,
51
52 is_terminated: bool,
53}
54
55enum ForwardingState<B: BlockT> {
60 Idle,
63 Busy(VecDeque<(B::Hash, TopicNotification)>),
67}
68
69impl<B: BlockT> Unpin for GossipEngine<B> {}
70
71impl<B: BlockT> GossipEngine<B> {
72 pub fn new<N, S>(
74 network: N,
75 sync: S,
76 notification_service: Box<dyn NotificationService>,
77 protocol: impl Into<ProtocolName>,
78 validator: Arc<dyn Validator<B>>,
79 metrics_registry: Option<&Registry>,
80 ) -> Self
81 where
82 B: 'static,
83 N: Network<B> + Send + Clone + 'static,
84 S: Syncing<B> + Send + Clone + 'static,
85 {
86 let protocol = protocol.into();
87 let sync_event_stream = sync.event_stream("network-gossip");
88
89 GossipEngine {
90 state_machine: ConsensusGossip::new(validator, protocol.clone(), metrics_registry),
91 network: Box::new(network),
92 sync: Box::new(sync),
93 notification_service,
94 periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
95 protocol,
96
97 sync_event_stream,
98 message_sinks: HashMap::new(),
99 forwarding_state: ForwardingState::Idle,
100
101 is_terminated: false,
102 }
103 }
104
105 pub fn report(&self, who: PeerId, reputation: ReputationChange) {
106 self.network.report_peer(who, reputation);
107 }
108
109 pub fn register_gossip_message(&mut self, topic: B::Hash, message: Vec<u8>) {
115 self.state_machine.register_message(topic, message);
116 }
117
118 pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
120 self.state_machine.broadcast_topic(&mut self.notification_service, topic, force);
121 }
122
123 pub fn messages_for(&mut self, topic: B::Hash) -> Receiver<TopicNotification> {
125 let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
126 let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
132
133 for notification in past_messages {
134 tx.try_send(notification)
135 .expect("receiver known to be live, and buffer size known to suffice; qed");
136 }
137
138 self.message_sinks.entry(topic).or_default().push(tx);
139
140 rx
141 }
142
143 pub fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
145 self.state_machine.send_topic(&mut self.notification_service, who, topic, force)
146 }
147
148 pub fn gossip_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
150 self.state_machine
151 .multicast(&mut self.notification_service, topic, message, force)
152 }
153
154 pub fn send_message(&mut self, who: Vec<PeerId>, data: Vec<u8>) {
157 for who in &who {
158 self.state_machine
159 .send_message(&mut self.notification_service, who, data.clone());
160 }
161 }
162
163 pub fn announce(&self, block: B::Hash, associated_data: Option<Vec<u8>>) {
168 self.sync.announce_block(block, associated_data);
169 }
170
171 pub fn take_notification_service(self) -> Box<dyn NotificationService> {
173 self.notification_service
174 }
175}
176
177impl<B: BlockT> Future for GossipEngine<B> {
178 type Output = ();
179
180 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
181 let this = &mut *self;
182
183 'outer: loop {
184 match &mut this.forwarding_state {
185 ForwardingState::Idle => {
186 let next_notification_event =
187 this.notification_service.next_event().poll_unpin(cx);
188 let sync_event_stream = this.sync_event_stream.poll_next_unpin(cx);
189
190 if next_notification_event.is_pending() && sync_event_stream.is_pending() {
191 break;
192 }
193
194 match next_notification_event {
195 Poll::Ready(Some(event)) => match event {
196 NotificationEvent::ValidateInboundSubstream {
197 peer,
198 handshake,
199 result_tx,
200 ..
201 } => {
202 let result = this
204 .network
205 .peer_role(peer, handshake)
206 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
207 let _ = result_tx.send(result);
208 },
209 NotificationEvent::NotificationStreamOpened {
210 peer, handshake, ..
211 } => {
212 if let Some(role) = this.network.peer_role(peer, handshake) {
213 this.state_machine.new_peer(
214 &mut this.notification_service,
215 peer,
216 role,
217 );
218 } else {
219 log::debug!(target: "gossip", "role for {peer} couldn't be determined");
220 }
221 },
222 NotificationEvent::NotificationStreamClosed { peer } => {
223 this.state_machine
224 .peer_disconnected(&mut this.notification_service, peer);
225 },
226 NotificationEvent::NotificationReceived { peer, notification } => {
227 let to_forward = this.state_machine.on_incoming(
228 &mut *this.network,
229 &mut this.notification_service,
230 peer,
231 vec![notification],
232 );
233 this.forwarding_state = ForwardingState::Busy(to_forward.into());
234 },
235 },
236 Poll::Ready(None) => {
238 self.is_terminated = true;
239 return Poll::Ready(());
240 },
241 Poll::Pending => {},
242 }
243
244 match sync_event_stream {
245 Poll::Ready(Some(event)) => match event {
246 SyncEvent::PeerConnected(remote) => {
247 this.network.add_set_reserved(remote, this.protocol.clone())
248 },
249 SyncEvent::PeerDisconnected(remote) => {
250 this.network.remove_set_reserved(remote, this.protocol.clone())
251 },
252 },
253 Poll::Ready(None) => {
255 self.is_terminated = true;
256 return Poll::Ready(());
257 },
258 Poll::Pending => {},
259 }
260 },
261 ForwardingState::Busy(to_forward) => {
262 let (topic, notification) = match to_forward.pop_front() {
263 Some(n) => n,
264 None => {
265 this.forwarding_state = ForwardingState::Idle;
266 continue;
267 },
268 };
269
270 let sinks = match this.message_sinks.get_mut(&topic) {
271 Some(sinks) => sinks,
272 None => continue,
273 };
274
275 for sink in sinks.iter_mut() {
277 match sink.poll_ready(cx) {
278 Poll::Ready(Ok(())) => {},
279 Poll::Ready(Err(_)) => {},
281 Poll::Pending => {
282 to_forward.push_front((topic, notification));
284 break 'outer;
285 },
286 }
287 }
288
289 sinks.retain(|sink| !sink.is_closed()); if sinks.is_empty() {
293 this.message_sinks.remove(&topic);
294 continue;
295 }
296
297 trace!(
298 target: "gossip",
299 "Pushing consensus message to sinks for {}.", topic,
300 );
301
302 for sink in sinks {
304 match sink.start_send(notification.clone()) {
305 Ok(()) => {},
306 Err(e) if e.is_full() => {
307 unreachable!("Previously ensured that all sinks are ready; qed.")
308 },
309 Err(_) => {},
311 }
312 }
313 },
314 }
315 }
316
317 while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
318 this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
319 this.state_machine.tick(&mut this.notification_service);
320
321 this.message_sinks.retain(|_, sinks| {
322 sinks.retain(|sink| !sink.is_closed());
323 !sinks.is_empty()
324 });
325 }
326
327 Poll::Pending
328 }
329}
330
331impl<B: BlockT> futures::future::FusedFuture for GossipEngine<B> {
332 fn is_terminated(&self) -> bool {
333 self.is_terminated
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::gossip::{ValidationResult, ValidatorContext};
341 use codec::{DecodeAll, Encode};
342 use futures::{
343 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
344 executor::{block_on, block_on_stream},
345 future::poll_fn,
346 };
347 use quickcheck::{Arbitrary, Gen, QuickCheck};
348 use soil_network::common::role::ObservedRole;
349 use soil_network::sync::SyncEventStream;
350 use soil_network::types::multiaddr::Multiaddr;
351 use soil_network::{
352 config::MultiaddrWithPeerId,
353 service::traits::{Direction, MessageSink, NotificationEvent},
354 Event, NetworkBlock, NetworkEventStream, NetworkPeers, NotificationService, Roles,
355 };
356 use std::{
357 collections::HashSet,
358 sync::{Arc, Mutex},
359 };
360 use subsoil::runtime::{
361 testing::H256,
362 traits::{Block as BlockT, NumberFor},
363 };
364 use soil_test_node_runtime_client::runtime::Block;
365
366 #[derive(Clone, Default)]
367 struct TestNetwork {}
368
369 #[async_trait::async_trait]
370 impl NetworkPeers for TestNetwork {
371 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
372 unimplemented!();
373 }
374
375 fn set_authorized_only(&self, _reserved_only: bool) {
376 unimplemented!();
377 }
378
379 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
380 unimplemented!();
381 }
382
383 fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {}
384
385 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
386 unimplemented!()
387 }
388
389 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
390 unimplemented!();
391 }
392
393 fn accept_unreserved_peers(&self) {
394 unimplemented!();
395 }
396
397 fn deny_unreserved_peers(&self) {
398 unimplemented!();
399 }
400
401 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
402 unimplemented!();
403 }
404
405 fn remove_reserved_peer(&self, _peer_id: PeerId) {
406 unimplemented!();
407 }
408
409 fn set_reserved_peers(
410 &self,
411 _protocol: ProtocolName,
412 _peers: HashSet<Multiaddr>,
413 ) -> Result<(), String> {
414 unimplemented!();
415 }
416
417 fn add_peers_to_reserved_set(
418 &self,
419 _protocol: ProtocolName,
420 _peers: HashSet<Multiaddr>,
421 ) -> Result<(), String> {
422 unimplemented!();
423 }
424
425 fn remove_peers_from_reserved_set(
426 &self,
427 _protocol: ProtocolName,
428 _peers: Vec<PeerId>,
429 ) -> Result<(), String> {
430 unimplemented!();
431 }
432
433 fn sync_num_connected(&self) -> usize {
434 unimplemented!();
435 }
436
437 fn peer_role(&self, _peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
438 Roles::decode_all(&mut &handshake[..])
439 .ok()
440 .and_then(|role| Some(ObservedRole::from(role)))
441 }
442
443 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
444 unimplemented!();
445 }
446 }
447
448 impl NetworkEventStream for TestNetwork {
449 fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
450 unimplemented!();
451 }
452 }
453
454 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestNetwork {
455 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
456 unimplemented!();
457 }
458
459 fn new_best_block_imported(
460 &self,
461 _hash: <Block as BlockT>::Hash,
462 _number: NumberFor<Block>,
463 ) {
464 unimplemented!();
465 }
466 }
467
468 #[derive(Clone, Default)]
469 struct TestSync {
470 inner: Arc<Mutex<TestSyncInner>>,
471 }
472
473 #[derive(Clone, Default)]
474 struct TestSyncInner {
475 event_senders: Vec<UnboundedSender<SyncEvent>>,
476 }
477
478 impl SyncEventStream for TestSync {
479 fn event_stream(
480 &self,
481 _name: &'static str,
482 ) -> Pin<Box<dyn Stream<Item = SyncEvent> + Send>> {
483 let (tx, rx) = unbounded();
484 self.inner.lock().unwrap().event_senders.push(tx);
485
486 Box::pin(rx)
487 }
488 }
489
490 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for TestSync {
491 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
492 unimplemented!();
493 }
494
495 fn new_best_block_imported(
496 &self,
497 _hash: <Block as BlockT>::Hash,
498 _number: NumberFor<Block>,
499 ) {
500 unimplemented!();
501 }
502 }
503
504 #[derive(Debug)]
505 pub(crate) struct TestNotificationService {
506 rx: UnboundedReceiver<NotificationEvent>,
507 }
508
509 #[async_trait::async_trait]
510 impl soil_network::service::traits::NotificationService for TestNotificationService {
511 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
512 unimplemented!();
513 }
514
515 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
516 unimplemented!();
517 }
518
519 fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
520 unimplemented!();
521 }
522
523 async fn send_async_notification(
524 &mut self,
525 _peer: &PeerId,
526 _notification: Vec<u8>,
527 ) -> Result<(), soil_network::error::Error> {
528 unimplemented!();
529 }
530
531 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
532 unimplemented!();
533 }
534
535 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
536 unimplemented!();
537 }
538
539 async fn next_event(&mut self) -> Option<NotificationEvent> {
540 self.rx.next().await
541 }
542
543 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
544 unimplemented!();
545 }
546
547 fn protocol(&self) -> &ProtocolName {
548 unimplemented!();
549 }
550
551 fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
552 unimplemented!();
553 }
554 }
555
556 struct AllowAll;
557 impl Validator<Block> for AllowAll {
558 fn validate(
559 &self,
560 _context: &mut dyn ValidatorContext<Block>,
561 _sender: &PeerId,
562 _data: &[u8],
563 ) -> ValidationResult<H256> {
564 ValidationResult::ProcessAndKeep(H256::default())
565 }
566 }
567
568 #[test]
573 fn returns_when_network_event_stream_closes() {
574 let network = TestNetwork::default();
575 let sync = Arc::new(TestSync::default());
576 let (tx, rx) = unbounded();
577 let notification_service = Box::new(TestNotificationService { rx });
578 let mut gossip_engine = GossipEngine::<Block>::new(
579 network.clone(),
580 sync,
581 notification_service,
582 "/my_protocol",
583 Arc::new(AllowAll {}),
584 None,
585 );
586
587 drop(tx);
589
590 block_on(poll_fn(move |ctx| {
591 if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
592 panic!(
593 "Expected gossip engine to finish on first poll, given that \
594 `GossipEngine.network_event_stream` closes right away."
595 )
596 }
597 Poll::Ready(())
598 }))
599 }
600
601 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
602 async fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
603 let topic = H256::default();
604 let protocol = ProtocolName::from("/my_protocol");
605 let remote_peer = PeerId::random();
606 let network = TestNetwork::default();
607 let sync = Arc::new(TestSync::default());
608 let (mut tx, rx) = unbounded();
609 let notification_service = Box::new(TestNotificationService { rx });
610
611 let mut gossip_engine = GossipEngine::<Block>::new(
612 network.clone(),
613 sync.clone(),
614 notification_service,
615 protocol.clone(),
616 Arc::new(AllowAll {}),
617 None,
618 );
619
620 tx.send(NotificationEvent::NotificationStreamOpened {
622 peer: remote_peer,
623 direction: Direction::Inbound,
624 negotiated_fallback: None,
625 handshake: Roles::FULL.encode(),
626 })
627 .await
628 .unwrap();
629
630 let messages = vec![vec![1], vec![2]];
631
632 tx.send(NotificationEvent::NotificationReceived {
634 peer: remote_peer,
635 notification: messages[0].clone().into(),
636 })
637 .await
638 .unwrap();
639
640 let mut subscribers = vec![];
641 for _ in 0..2 {
642 subscribers.push(gossip_engine.messages_for(topic));
643 }
644
645 tx.send(NotificationEvent::NotificationReceived {
647 peer: remote_peer,
648 notification: messages[1].clone().into(),
649 })
650 .await
651 .unwrap();
652
653 tokio::spawn(gossip_engine);
654
655 let mut subscribers =
658 subscribers.into_iter().map(|s| block_on_stream(s)).collect::<Vec<_>>();
659
660 for message in messages {
662 for subscriber in subscribers.iter_mut() {
663 assert_eq!(
664 subscriber.next(),
665 Some(TopicNotification { message: message.clone(), sender: Some(remote_peer) }),
666 );
667 }
668 }
669 }
670
671 #[test]
672 fn forwarding_to_different_size_and_topic_channels() {
673 #[derive(Clone, Debug)]
674 struct ChannelLengthAndTopic {
675 length: usize,
676 topic: H256,
677 }
678
679 impl Arbitrary for ChannelLengthAndTopic {
680 fn arbitrary(g: &mut Gen) -> Self {
681 let possible_length = (0..100).collect::<Vec<usize>>();
682 let possible_topics = (0..10).collect::<Vec<u64>>();
683 Self {
684 length: *g.choose(&possible_length).unwrap(),
685 topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
688 }
689 }
690 }
691
692 #[derive(Clone, Debug)]
693 struct Message {
694 topic: H256,
695 }
696
697 impl Arbitrary for Message {
698 fn arbitrary(g: &mut Gen) -> Self {
699 let possible_topics = (0..10).collect::<Vec<u64>>();
700 Self {
701 topic: H256::from_low_u64_ne(*g.choose(&possible_topics).unwrap()),
704 }
705 }
706 }
707
708 struct TestValidator;
711
712 impl Validator<Block> for TestValidator {
713 fn validate(
714 &self,
715 _context: &mut dyn ValidatorContext<Block>,
716 _sender: &PeerId,
717 data: &[u8],
718 ) -> ValidationResult<H256> {
719 ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
720 }
721 }
722
723 fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
724 let protocol = ProtocolName::from("/my_protocol");
725 let remote_peer = PeerId::random();
726 let network = TestNetwork::default();
727 let sync = Arc::new(TestSync::default());
728 let (mut tx, rx) = unbounded();
729 let notification_service = Box::new(TestNotificationService { rx });
730
731 let num_channels_per_topic = channels.iter().fold(
732 HashMap::new(),
733 |mut acc, ChannelLengthAndTopic { topic, .. }| {
734 acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
735 acc
736 },
737 );
738
739 let expected_msgs_per_topic_all_chan = notifications
740 .iter()
741 .fold(HashMap::new(), |mut acc, messages| {
742 for message in messages {
743 acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
744 }
745 acc
746 })
747 .into_iter()
748 .map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
752 .collect::<HashMap<H256, _>>();
753
754 let mut gossip_engine = GossipEngine::<Block>::new(
755 network.clone(),
756 sync.clone(),
757 notification_service,
758 protocol.clone(),
759 Arc::new(TestValidator {}),
760 None,
761 );
762
763 let (txs, mut rxs) = channels
765 .iter()
766 .map(|ChannelLengthAndTopic { length, topic }| (*topic, channel(*length)))
767 .fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
768 acc.0.push((topic, tx));
769 acc.1.push((topic, rx));
770 acc
771 });
772
773 for (topic, tx) in txs {
775 match gossip_engine.message_sinks.get_mut(&topic) {
776 Some(entry) => entry.push(tx),
777 None => {
778 gossip_engine.message_sinks.insert(topic, vec![tx]);
779 },
780 }
781 }
782
783 tx.start_send(NotificationEvent::NotificationStreamOpened {
785 peer: remote_peer,
786 direction: Direction::Inbound,
787 negotiated_fallback: None,
788 handshake: Roles::FULL.encode(),
789 })
790 .unwrap();
791
792 for (i_notification, messages) in notifications.iter().enumerate() {
794 let messages: Vec<Vec<u8>> = messages
795 .into_iter()
796 .enumerate()
797 .map(|(i_message, Message { topic })| {
798 let mut message = topic.as_bytes().to_vec();
801
802 message.push(i_notification.try_into().unwrap());
805 message.push(i_message.try_into().unwrap());
806
807 message.into()
808 })
809 .collect();
810
811 for message in messages {
812 tx.start_send(NotificationEvent::NotificationReceived {
813 peer: remote_peer,
814 notification: message,
815 })
816 .unwrap();
817 }
818 }
819
820 let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
821
822 block_on(poll_fn(|cx| {
824 loop {
825 if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
826 unreachable!(
827 "Event stream sender side is not dropped, thus gossip engine does not \
828 terminate",
829 );
830 }
831
832 let mut progress = false;
833
834 for (topic, rx) in rxs.iter_mut() {
835 match rx.poll_next_unpin(cx) {
836 Poll::Ready(Some(_)) => {
837 progress = true;
838 received_msgs_per_topic_all_chan
839 .entry(*topic)
840 .and_modify(|e| *e += 1)
841 .or_insert(1);
842 },
843 Poll::Ready(None) => {
844 unreachable!("Sender side of channel is never dropped")
845 },
846 Poll::Pending => {},
847 }
848 }
849
850 if !progress {
851 break;
852 }
853 }
854 Poll::Ready(())
855 }));
856
857 for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
859 assert_eq!(
860 received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
861 expected_num,
862 );
863 }
864 for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
865 assert_eq!(
866 expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
867 received_num,
868 );
869 }
870 }
871
872 prop(vec![], vec![vec![Message { topic: H256::default() }]]);
874 prop(
875 vec![ChannelLengthAndTopic { length: 71, topic: H256::default() }],
876 vec![vec![Message { topic: H256::default() }]],
877 );
878
879 QuickCheck::new().quickcheck(prop as fn(_, _))
880 }
881}