1use crate::gossip::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext};
8
9use ahash::AHashSet;
10use schnellru::{ByLength, LruMap};
11use soil_network::types::PeerId;
12
13use soil_prometheus::{register, Counter, PrometheusError, Registry, U64};
14use soil_network::common::role::ObservedRole;
15use soil_network::{types::ProtocolName, NotificationService};
16use std::{collections::HashMap, iter, sync::Arc, time, time::Instant};
17use subsoil::runtime::traits::{Block as BlockT, Hash, HashingFor};
18
19const KNOWN_MESSAGES_CACHE_SIZE: u32 = 8192;
29
30const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_millis(750);
31
32pub(crate) const PERIODIC_MAINTENANCE_INTERVAL: time::Duration = time::Duration::from_millis(1100);
33
34mod rep {
35 use soil_network::ReputationChange as Rep;
36 pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successful gossip");
38 pub const DUPLICATE_GOSSIP: Rep = Rep::new(-(1 << 2), "Duplicate gossip");
40}
41
42struct PeerConsensus<H> {
43 known_messages: AHashSet<H>,
44}
45
46#[derive(Clone, Debug, Eq, PartialEq)]
48pub struct TopicNotification {
49 pub message: Vec<u8>,
51 pub sender: Option<PeerId>,
53}
54
55struct MessageEntry<B: BlockT> {
56 message_hash: B::Hash,
57 topic: B::Hash,
58 message: Vec<u8>,
59 sender: Option<PeerId>,
60}
61
62struct NetworkContext<'g, 'p, B: BlockT> {
64 gossip: &'g mut ConsensusGossip<B>,
65 notification_service: &'p mut Box<dyn NotificationService>,
66}
67
68impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
69 fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
71 self.gossip.broadcast_topic(self.notification_service, topic, force);
72 }
73
74 fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
76 self.gossip.multicast(self.notification_service, topic, message, force);
77 }
78
79 fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
81 self.notification_service.send_sync_notification(who, message);
82 }
83
84 fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
86 self.gossip.send_topic(self.notification_service, who, topic, force);
87 }
88}
89
90fn propagate<'a, B: BlockT, I>(
91 notification_service: &mut Box<dyn NotificationService>,
92 protocol: ProtocolName,
93 messages: I,
94 intent: MessageIntent,
95 peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
96 validator: &Arc<dyn Validator<B>>,
97)
98where
100 I: Clone + IntoIterator<Item = (&'a B::Hash, &'a B::Hash, &'a Vec<u8>)>,
101{
102 let mut message_allowed = validator.message_allowed();
103
104 for (id, ref mut peer) in peers.iter_mut() {
105 for (message_hash, topic, message) in messages.clone() {
106 let intent = match intent {
107 MessageIntent::Broadcast { .. } => {
108 if peer.known_messages.contains(message_hash) {
109 continue;
110 } else {
111 MessageIntent::Broadcast
112 }
113 },
114 MessageIntent::PeriodicRebroadcast => {
115 if peer.known_messages.contains(message_hash) {
116 MessageIntent::PeriodicRebroadcast
117 } else {
118 MessageIntent::Broadcast
121 }
122 },
123 other => other,
124 };
125
126 if !message_allowed(id, intent, topic, message) {
127 continue;
128 }
129
130 peer.known_messages.insert(*message_hash);
131
132 tracing::trace!(
133 target: "gossip",
134 to = %id,
135 %protocol,
136 ?message,
137 "Propagating message",
138 );
139 notification_service.send_sync_notification(id, message.clone());
140 }
141 }
142}
143
144pub struct ConsensusGossip<B: BlockT> {
146 peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
147 messages: Vec<MessageEntry<B>>,
148 known_messages: LruMap<B::Hash, ()>,
149 protocol: ProtocolName,
150 validator: Arc<dyn Validator<B>>,
151 next_broadcast: Instant,
152 metrics: Option<Metrics>,
153}
154
155impl<B: BlockT> ConsensusGossip<B> {
156 pub fn new(
158 validator: Arc<dyn Validator<B>>,
159 protocol: ProtocolName,
160 metrics_registry: Option<&Registry>,
161 ) -> Self {
162 let metrics = match metrics_registry.map(Metrics::register) {
163 Some(Ok(metrics)) => Some(metrics),
164 Some(Err(e)) => {
165 tracing::debug!(target: "gossip", "Failed to register metrics: {:?}", e);
166 None
167 },
168 None => None,
169 };
170
171 ConsensusGossip {
172 peers: HashMap::new(),
173 messages: Default::default(),
174 known_messages: { LruMap::new(ByLength::new(KNOWN_MESSAGES_CACHE_SIZE)) },
175 protocol,
176 validator,
177 next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
178 metrics,
179 }
180 }
181
182 pub fn new_peer(
184 &mut self,
185 notification_service: &mut Box<dyn NotificationService>,
186 who: PeerId,
187 role: ObservedRole,
188 ) {
189 tracing::trace!(
190 target:"gossip",
191 %who,
192 protocol = %self.protocol,
193 ?role,
194 "Registering peer",
195 );
196 self.peers.insert(who, PeerConsensus { known_messages: Default::default() });
197
198 let validator = self.validator.clone();
199 let mut context = NetworkContext { gossip: self, notification_service };
200 validator.new_peer(&mut context, &who, role);
201 }
202
203 fn register_message_hashed(
204 &mut self,
205 message_hash: B::Hash,
206 topic: B::Hash,
207 message: Vec<u8>,
208 sender: Option<PeerId>,
209 ) {
210 if self.known_messages.insert(message_hash, ()) {
211 self.messages.push(MessageEntry { message_hash, topic, message, sender });
212
213 if let Some(ref metrics) = self.metrics {
214 metrics.registered_messages.inc();
215 }
216 }
217 }
218
219 pub fn register_message(&mut self, topic: B::Hash, message: Vec<u8>) {
225 let message_hash = HashingFor::<B>::hash(&message[..]);
226 self.register_message_hashed(message_hash, topic, message, None);
227 }
228
229 pub fn peer_disconnected(
231 &mut self,
232 notification_service: &mut Box<dyn NotificationService>,
233 who: PeerId,
234 ) {
235 let validator = self.validator.clone();
236 let mut context = NetworkContext { gossip: self, notification_service };
237 validator.peer_disconnected(&mut context, &who);
238 self.peers.remove(&who);
239 }
240
241 pub fn tick(&mut self, notification_service: &mut Box<dyn NotificationService>) {
243 self.collect_garbage();
244 if Instant::now() >= self.next_broadcast {
245 self.rebroadcast(notification_service);
246 self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
247 }
248 }
249
250 fn rebroadcast(&mut self, notification_service: &mut Box<dyn NotificationService>) {
252 let messages = self
253 .messages
254 .iter()
255 .map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
256
257 propagate(
258 notification_service,
259 self.protocol.clone(),
260 messages,
261 MessageIntent::PeriodicRebroadcast,
262 &mut self.peers,
263 &self.validator,
264 );
265 }
266
267 pub fn broadcast_topic(
269 &mut self,
270 notification_service: &mut Box<dyn NotificationService>,
271 topic: B::Hash,
272 force: bool,
273 ) {
274 let messages = self.messages.iter().filter_map(|entry| {
275 if entry.topic == topic {
276 Some((&entry.message_hash, &entry.topic, &entry.message))
277 } else {
278 None
279 }
280 });
281 let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
282 propagate(
283 notification_service,
284 self.protocol.clone(),
285 messages,
286 intent,
287 &mut self.peers,
288 &self.validator,
289 );
290 }
291
292 pub fn collect_garbage(&mut self) {
295 let known_messages = &mut self.known_messages;
296 let before = self.messages.len();
297
298 let mut message_expired = self.validator.message_expired();
299 self.messages.retain(|entry| !message_expired(entry.topic, &entry.message));
300
301 let expired_messages = before - self.messages.len();
302
303 if let Some(ref metrics) = self.metrics {
304 metrics.expired_messages.inc_by(expired_messages as u64)
305 }
306
307 tracing::trace!(
308 target: "gossip",
309 protocol = %self.protocol,
310 "Cleaned up {} stale messages, {} left ({} known)",
311 expired_messages,
312 self.messages.len(),
313 known_messages.len(),
314 );
315
316 for (_, ref mut peer) in self.peers.iter_mut() {
317 peer.known_messages.retain(|h| known_messages.get(h).is_some());
318 }
319 }
320
321 pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
323 self.messages
324 .iter()
325 .filter(move |e| e.topic == topic)
326 .map(|entry| TopicNotification { message: entry.message.clone(), sender: entry.sender })
327 }
328
329 pub fn on_incoming(
332 &mut self,
333 network: &mut dyn Network<B>,
334 notification_service: &mut Box<dyn NotificationService>,
335 who: PeerId,
336 messages: Vec<Vec<u8>>,
337 ) -> Vec<(B::Hash, TopicNotification)> {
338 let mut to_forward = vec![];
339
340 if !messages.is_empty() {
341 tracing::trace!(
342 target: "gossip",
343 messages_num = %messages.len(),
344 %who,
345 protocol = %self.protocol,
346 "Received messages from peer",
347 );
348 }
349
350 for message in messages {
351 let message_hash = HashingFor::<B>::hash(&message[..]);
352
353 if self.known_messages.get(&message_hash).is_some() {
354 tracing::trace!(
355 target: "gossip",
356 %who,
357 protocol = %self.protocol,
358 "Ignored already known message",
359 );
360
361 if self
363 .peers
364 .get_mut(&who)
365 .map_or(false, |p| !p.known_messages.insert(message_hash))
366 {
367 network.report_peer(who, rep::DUPLICATE_GOSSIP);
368 }
369 continue;
370 }
371
372 let validation = {
374 let validator = self.validator.clone();
375 let mut context = NetworkContext { gossip: self, notification_service };
376 validator.validate(&mut context, &who, &message)
377 };
378
379 let (topic, keep) = match validation {
380 ValidationResult::ProcessAndKeep(topic) => (topic, true),
381 ValidationResult::ProcessAndDiscard(topic) => (topic, false),
382 ValidationResult::Discard => {
383 tracing::trace!(
384 target: "gossip",
385 %who,
386 protocol = %self.protocol,
387 "Discard message from peer",
388 );
389 continue;
390 },
391 };
392
393 let peer = match self.peers.get_mut(&who) {
394 Some(peer) => peer,
395 None => {
396 tracing::error!(
397 target: "gossip",
398 %who,
399 protocol = %self.protocol,
400 "Got message from unregistered peer",
401 );
402 continue;
403 },
404 };
405
406 network.report_peer(who, rep::GOSSIP_SUCCESS);
407 peer.known_messages.insert(message_hash);
408 to_forward
409 .push((topic, TopicNotification { message: message.clone(), sender: Some(who) }));
410
411 if keep {
412 self.register_message_hashed(message_hash, topic, message, Some(who));
413 }
414 }
415
416 to_forward
417 }
418
419 pub fn send_topic(
421 &mut self,
422 notification_service: &mut Box<dyn NotificationService>,
423 who: &PeerId,
424 topic: B::Hash,
425 force: bool,
426 ) {
427 let mut message_allowed = self.validator.message_allowed();
428
429 if let Some(ref mut peer) = self.peers.get_mut(who) {
430 for entry in self.messages.iter().filter(|m| m.topic == topic) {
431 let intent =
432 if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
433
434 if !force && peer.known_messages.contains(&entry.message_hash) {
435 continue;
436 }
437
438 if !message_allowed(who, intent, &entry.topic, &entry.message) {
439 continue;
440 }
441
442 peer.known_messages.insert(entry.message_hash);
443
444 tracing::trace!(
445 target: "gossip",
446 to = %who,
447 protocol = %self.protocol,
448 ?entry.message,
449 "Sending topic message",
450 );
451 notification_service.send_sync_notification(who, entry.message.clone());
452 }
453 }
454 }
455
456 pub fn multicast(
458 &mut self,
459 notification_service: &mut Box<dyn NotificationService>,
460 topic: B::Hash,
461 message: Vec<u8>,
462 force: bool,
463 ) {
464 let message_hash = HashingFor::<B>::hash(&message);
465 self.register_message_hashed(message_hash, topic, message.clone(), None);
466 let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
467 propagate(
468 notification_service,
469 self.protocol.clone(),
470 iter::once((&message_hash, &topic, &message)),
471 intent,
472 &mut self.peers,
473 &self.validator,
474 );
475 }
476
477 pub fn send_message(
480 &mut self,
481 notification_service: &mut Box<dyn NotificationService>,
482 who: &PeerId,
483 message: Vec<u8>,
484 ) {
485 let peer = match self.peers.get_mut(who) {
486 None => return,
487 Some(peer) => peer,
488 };
489
490 let message_hash = HashingFor::<B>::hash(&message);
491
492 tracing::trace!(
493 target: "gossip",
494 to = %who,
495 protocol = %self.protocol,
496 ?message,
497 "Sending direct message",
498 );
499
500 peer.known_messages.insert(message_hash);
501 notification_service.send_sync_notification(who, message)
502 }
503}
504
505struct Metrics {
506 registered_messages: Counter<U64>,
507 expired_messages: Counter<U64>,
508}
509
510impl Metrics {
511 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
512 Ok(Self {
513 registered_messages: register(
514 Counter::new(
515 "substrate_network_gossip_registered_messages_total",
516 "Number of registered messages by the gossip service.",
517 )?,
518 registry,
519 )?,
520 expired_messages: register(
521 Counter::new(
522 "substrate_network_gossip_expired_messages_total",
523 "Number of expired messages by the gossip service.",
524 )?,
525 registry,
526 )?,
527 })
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use futures::prelude::*;
535 use soil_network::types::multiaddr::Multiaddr;
536 use soil_network::{
537 config::MultiaddrWithPeerId, event::Event, service::traits::NotificationEvent, MessageSink,
538 NetworkBlock, NetworkEventStream, NetworkPeers, ReputationChange,
539 };
540 use std::{
541 collections::HashSet,
542 pin::Pin,
543 sync::{Arc, Mutex},
544 };
545 use subsoil::runtime::{
546 testing::{Block as RawBlock, MockCallU64, TestXt, H256},
547 traits::NumberFor,
548 };
549
550 type Block = RawBlock<TestXt<MockCallU64, ()>>;
551
552 macro_rules! push_msg {
553 ($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
554 if $consensus.known_messages.insert($hash, ()) {
555 $consensus.messages.push(MessageEntry {
556 message_hash: $hash,
557 topic: $topic,
558 message: $m,
559 sender: None,
560 });
561 }
562 };
563 }
564
565 struct AllowAll;
566 impl Validator<Block> for AllowAll {
567 fn validate(
568 &self,
569 _context: &mut dyn ValidatorContext<Block>,
570 _sender: &PeerId,
571 _data: &[u8],
572 ) -> ValidationResult<H256> {
573 ValidationResult::ProcessAndKeep(H256::default())
574 }
575 }
576
577 struct DiscardAll;
578 impl Validator<Block> for DiscardAll {
579 fn validate(
580 &self,
581 _context: &mut dyn ValidatorContext<Block>,
582 _sender: &PeerId,
583 _data: &[u8],
584 ) -> ValidationResult<H256> {
585 ValidationResult::Discard
586 }
587 }
588
589 #[derive(Clone, Default)]
590 struct NoOpNetwork {
591 inner: Arc<Mutex<NoOpNetworkInner>>,
592 }
593
594 #[derive(Clone, Default)]
595 struct NoOpNetworkInner {
596 peer_reports: Vec<(PeerId, ReputationChange)>,
597 }
598
599 #[async_trait::async_trait]
600 impl NetworkPeers for NoOpNetwork {
601 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
602 unimplemented!();
603 }
604
605 fn set_authorized_only(&self, _reserved_only: bool) {
606 unimplemented!();
607 }
608
609 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
610 unimplemented!();
611 }
612
613 fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
614 self.inner.lock().unwrap().peer_reports.push((peer_id, cost_benefit));
615 }
616
617 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
618 unimplemented!()
619 }
620
621 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
622 unimplemented!();
623 }
624
625 fn accept_unreserved_peers(&self) {
626 unimplemented!();
627 }
628
629 fn deny_unreserved_peers(&self) {
630 unimplemented!();
631 }
632
633 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
634 unimplemented!();
635 }
636
637 fn remove_reserved_peer(&self, _peer_id: PeerId) {
638 unimplemented!();
639 }
640
641 fn set_reserved_peers(
642 &self,
643 _protocol: ProtocolName,
644 _peers: HashSet<Multiaddr>,
645 ) -> Result<(), String> {
646 unimplemented!();
647 }
648
649 fn add_peers_to_reserved_set(
650 &self,
651 _protocol: ProtocolName,
652 _peers: HashSet<Multiaddr>,
653 ) -> Result<(), String> {
654 unimplemented!();
655 }
656
657 fn remove_peers_from_reserved_set(
658 &self,
659 _protocol: ProtocolName,
660 _peers: Vec<PeerId>,
661 ) -> Result<(), String> {
662 unimplemented!();
663 }
664
665 fn sync_num_connected(&self) -> usize {
666 unimplemented!();
667 }
668
669 fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
670 None
671 }
672
673 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
674 unimplemented!();
675 }
676 }
677
678 impl NetworkEventStream for NoOpNetwork {
679 fn event_stream(&self, _name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
680 unimplemented!();
681 }
682 }
683
684 impl NetworkBlock<<Block as BlockT>::Hash, NumberFor<Block>> for NoOpNetwork {
685 fn announce_block(&self, _hash: <Block as BlockT>::Hash, _data: Option<Vec<u8>>) {
686 unimplemented!();
687 }
688
689 fn new_best_block_imported(
690 &self,
691 _hash: <Block as BlockT>::Hash,
692 _number: NumberFor<Block>,
693 ) {
694 unimplemented!();
695 }
696 }
697
698 #[derive(Debug, Default)]
699 struct NoOpNotificationService {}
700
701 #[async_trait::async_trait]
702 impl NotificationService for NoOpNotificationService {
703 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
705 unimplemented!();
706 }
707
708 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
710 unimplemented!();
711 }
712
713 fn send_sync_notification(&mut self, _peer: &PeerId, _notification: Vec<u8>) {
715 unimplemented!();
716 }
717
718 async fn send_async_notification(
720 &mut self,
721 _peer: &PeerId,
722 _notification: Vec<u8>,
723 ) -> Result<(), soil_network::error::Error> {
724 unimplemented!();
725 }
726
727 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
729 unimplemented!();
730 }
731
732 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
733 unimplemented!();
734 }
735
736 async fn next_event(&mut self) -> Option<NotificationEvent> {
738 None
739 }
740
741 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
742 unimplemented!();
743 }
744
745 fn protocol(&self) -> &ProtocolName {
746 unimplemented!();
747 }
748
749 fn message_sink(&self, _peer: &PeerId) -> Option<Box<dyn MessageSink>> {
750 unimplemented!();
751 }
752 }
753
754 #[test]
755 fn collects_garbage() {
756 struct AllowOne;
757 impl Validator<Block> for AllowOne {
758 fn validate(
759 &self,
760 _context: &mut dyn ValidatorContext<Block>,
761 _sender: &PeerId,
762 data: &[u8],
763 ) -> ValidationResult<H256> {
764 if data[0] == 1 {
765 ValidationResult::ProcessAndKeep(H256::default())
766 } else {
767 ValidationResult::Discard
768 }
769 }
770
771 fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
772 Box::new(move |_topic, data| data[0] != 1)
773 }
774 }
775
776 let prev_hash = H256::random();
777 let best_hash = H256::random();
778 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
779 let m1_hash = H256::random();
780 let m2_hash = H256::random();
781 let m1 = vec![1, 2, 3];
782 let m2 = vec![4, 5, 6];
783
784 push_msg!(consensus, prev_hash, m1_hash, m1);
785 push_msg!(consensus, best_hash, m2_hash, m2);
786 consensus.known_messages.insert(m1_hash, ());
787 consensus.known_messages.insert(m2_hash, ());
788
789 consensus.collect_garbage();
790 assert_eq!(consensus.messages.len(), 2);
791 assert_eq!(consensus.known_messages.len(), 2);
792
793 consensus.validator = Arc::new(AllowOne);
794
795 consensus.collect_garbage();
797 assert_eq!(consensus.messages.len(), 1);
798 assert_eq!(consensus.known_messages.len(), 2);
800 assert!(consensus.known_messages.get(&m2_hash).is_some());
801 }
802
803 #[test]
804 fn message_stream_include_those_sent_before_asking() {
805 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
806
807 let message = vec![4, 5, 6];
809 let topic = HashingFor::<Block>::hash(&[1, 2, 3]);
810 consensus.register_message(topic, message.clone());
811
812 assert_eq!(
813 consensus.messages_for(topic).next(),
814 Some(TopicNotification { message, sender: None }),
815 );
816 }
817
818 #[test]
819 fn can_keep_multiple_messages_per_topic() {
820 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
821
822 let topic = [1; 32].into();
823 let msg_a = vec![1, 2, 3];
824 let msg_b = vec![4, 5, 6];
825
826 consensus.register_message(topic, msg_a);
827 consensus.register_message(topic, msg_b);
828
829 assert_eq!(consensus.messages.len(), 2);
830 }
831
832 #[test]
833 fn peer_is_removed_on_disconnect() {
834 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
835
836 let mut notification_service: Box<dyn NotificationService> =
837 Box::new(NoOpNotificationService::default());
838
839 let peer_id = PeerId::random();
840 consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
841 assert!(consensus.peers.contains_key(&peer_id));
842
843 consensus.peer_disconnected(&mut notification_service, peer_id);
844 assert!(!consensus.peers.contains_key(&peer_id));
845 }
846
847 #[test]
848 fn on_incoming_ignores_discarded_messages() {
849 let mut notification_service: Box<dyn NotificationService> =
850 Box::new(NoOpNotificationService::default());
851 let to_forward = ConsensusGossip::<Block>::new(Arc::new(DiscardAll), "/foo".into(), None)
852 .on_incoming(
853 &mut NoOpNetwork::default(),
854 &mut notification_service,
855 PeerId::random(),
856 vec![vec![1, 2, 3]],
857 );
858
859 assert!(
860 to_forward.is_empty(),
861 "Expected `on_incoming` to ignore discarded message but got {:?}",
862 to_forward,
863 );
864 }
865
866 #[test]
867 fn on_incoming_ignores_unregistered_peer() {
868 let mut network = NoOpNetwork::default();
869 let mut notification_service: Box<dyn NotificationService> =
870 Box::new(NoOpNotificationService::default());
871 let remote = PeerId::random();
872
873 let to_forward = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None)
874 .on_incoming(
875 &mut network,
876 &mut notification_service,
877 remote,
879 vec![vec![1, 2, 3]],
880 );
881
882 assert!(
883 to_forward.is_empty(),
884 "Expected `on_incoming` to ignore message from unregistered peer but got {:?}",
885 to_forward,
886 );
887 }
888
889 #[test]
892 fn do_not_report_peer_for_first_time_duplicate_gossip_message() {
893 let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), "/foo".into(), None);
894
895 let mut network = NoOpNetwork::default();
896 let mut notification_service: Box<dyn NotificationService> =
897 Box::new(NoOpNotificationService::default());
898
899 let peer_id = PeerId::random();
900 consensus.new_peer(&mut notification_service, peer_id, ObservedRole::Full);
901 assert!(consensus.peers.contains_key(&peer_id));
902
903 let peer_id2 = PeerId::random();
904 consensus.new_peer(&mut notification_service, peer_id2, ObservedRole::Full);
905 assert!(consensus.peers.contains_key(&peer_id2));
906
907 let message = vec![vec![1, 2, 3]];
908 consensus.on_incoming(&mut network, &mut notification_service, peer_id, message.clone());
909 consensus.on_incoming(&mut network, &mut notification_service, peer_id2, message.clone());
910
911 assert_eq!(
912 vec![(peer_id, rep::GOSSIP_SUCCESS)],
913 network.inner.lock().unwrap().peer_reports
914 );
915 }
916}