1use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt};
33use prometheus_endpoint::{
34 prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64,
35};
36use sc_network::{
37 config::{NonReservedPeerMode, SetConfig},
38 error, multiaddr,
39 peer_store::PeerStoreProvider,
40 service::{
41 traits::{NotificationEvent, NotificationService, ValidationResult},
42 NotificationMetrics,
43 },
44 types::ProtocolName,
45 utils::{interval, LruHashSet},
46 NetworkBackend, NetworkEventStream, NetworkPeers,
47};
48use sc_network_common::role::ObservedRole;
49use sc_network_sync::{SyncEvent, SyncEventStream};
50use sc_network_types::PeerId;
51use sp_runtime::traits::Block as BlockT;
52use sp_statement_store::{
53 Hash, NetworkPriority, Statement, StatementSource, StatementStore, SubmitResult,
54};
55use std::{
56 collections::{hash_map::Entry, HashMap, HashSet},
57 iter,
58 num::NonZeroUsize,
59 pin::Pin,
60 sync::Arc,
61};
62use tokio::time::timeout;
63
64pub mod config;
65
66pub type Statements = Vec<Statement>;
68pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
70
71mod rep {
72 use sc_network::ReputationChange as Rep;
73 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
78 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
80 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 7, "Good statement");
82 pub const BAD_STATEMENT: Rep = Rep::new(-(1 << 12), "Bad statement");
84 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
86 pub const EXCELLENT_STATEMENT: Rep = Rep::new(1 << 8, "High priority statement");
88}
89
90const LOG_TARGET: &str = "statement-gossip";
91const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
93
94struct Metrics {
95 propagated_statements: Counter<U64>,
96 known_statements_received: Counter<U64>,
97 skipped_oversized_statements: Counter<U64>,
98 propagated_statements_chunks: Histogram,
99 pending_statements: Gauge<U64>,
100 ignored_statements: Counter<U64>,
101}
102
103impl Metrics {
104 fn register(r: &Registry) -> Result<Self, PrometheusError> {
105 Ok(Self {
106 propagated_statements: register(
107 Counter::new(
108 "substrate_sync_propagated_statements",
109 "Number of statements propagated to at least one peer",
110 )?,
111 r,
112 )?,
113 known_statements_received: register(
114 Counter::new(
115 "substrate_sync_known_statement_received",
116 "Number of statements received via gossiping that were already in the statement store",
117 )?,
118 r,
119 )?,
120 skipped_oversized_statements: register(
121 Counter::new(
122 "substrate_sync_skipped_oversized_statements",
123 "Number of oversized statements that were skipped to be gossiped",
124 )?,
125 r,
126 )?,
127 propagated_statements_chunks: register(
128 Histogram::with_opts(
129 HistogramOpts::new(
130 "substrate_sync_propagated_statements_chunks",
131 "Distribution of chunk sizes when propagating statements",
132 ).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?),
133 )?,
134 r,
135 )?,
136 pending_statements: register(
137 Gauge::new(
138 "substrate_sync_pending_statement_validations",
139 "Number of pending statement validations",
140 )?,
141 r,
142 )?,
143 ignored_statements: register(
144 Counter::new(
145 "substrate_sync_ignored_statements",
146 "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
147 )?,
148 r,
149 )?,
150 })
151 }
152}
153
154pub struct StatementHandlerPrototype {
156 protocol_name: ProtocolName,
157 notification_service: Box<dyn NotificationService>,
158}
159
160impl StatementHandlerPrototype {
161 pub fn new<
163 Hash: AsRef<[u8]>,
164 Block: BlockT,
165 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
166 >(
167 genesis_hash: Hash,
168 fork_id: Option<&str>,
169 metrics: NotificationMetrics,
170 peer_store_handle: Arc<dyn PeerStoreProvider>,
171 ) -> (Self, Net::NotificationProtocolConfig) {
172 let genesis_hash = genesis_hash.as_ref();
173 let protocol_name = if let Some(fork_id) = fork_id {
174 format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
175 } else {
176 format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
177 };
178 let (config, notification_service) = Net::notification_config(
179 protocol_name.clone().into(),
180 Vec::new(),
181 MAX_STATEMENT_NOTIFICATION_SIZE,
182 None,
183 SetConfig {
184 in_peers: 0,
185 out_peers: 0,
186 reserved_nodes: Vec::new(),
187 non_reserved_mode: NonReservedPeerMode::Deny,
188 },
189 metrics,
190 peer_store_handle,
191 );
192
193 (Self { protocol_name: protocol_name.into(), notification_service }, config)
194 }
195
196 pub fn build<
201 N: NetworkPeers + NetworkEventStream,
202 S: SyncEventStream + sp_consensus::SyncOracle,
203 >(
204 self,
205 network: N,
206 sync: S,
207 statement_store: Arc<dyn StatementStore>,
208 metrics_registry: Option<&Registry>,
209 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
210 ) -> error::Result<StatementHandler<N, S>> {
211 let sync_event_stream = sync.event_stream("statement-handler-sync");
212 let (queue_sender, mut queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
213
214 let store = statement_store.clone();
215 executor(
216 async move {
217 loop {
218 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
219 queue_receiver.next().await;
220 match task {
221 None => return,
222 Some((statement, completion)) => {
223 let result = store.submit(statement, StatementSource::Network);
224 if completion.send(result).is_err() {
225 log::debug!(
226 target: LOG_TARGET,
227 "Error sending validation completion"
228 );
229 }
230 },
231 }
232 }
233 }
234 .boxed(),
235 );
236
237 let handler = StatementHandler {
238 protocol_name: self.protocol_name,
239 notification_service: self.notification_service,
240 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
241 as Pin<Box<dyn Stream<Item = ()> + Send>>)
242 .fuse(),
243 pending_statements: FuturesUnordered::new(),
244 pending_statements_peers: HashMap::new(),
245 network,
246 sync,
247 sync_event_stream: sync_event_stream.fuse(),
248 peers: HashMap::new(),
249 statement_store,
250 queue_sender,
251 metrics: if let Some(r) = metrics_registry {
252 Some(Metrics::register(r)?)
253 } else {
254 None
255 },
256 };
257
258 Ok(handler)
259 }
260}
261
262pub struct StatementHandler<
264 N: NetworkPeers + NetworkEventStream,
265 S: SyncEventStream + sp_consensus::SyncOracle,
266> {
267 protocol_name: ProtocolName,
268 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
270 pending_statements:
272 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
273 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
278 network: N,
280 sync: S,
282 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
284 notification_service: Box<dyn NotificationService>,
286 peers: HashMap<PeerId, Peer>,
288 statement_store: Arc<dyn StatementStore>,
289 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
290 metrics: Option<Metrics>,
292}
293
294#[derive(Debug)]
296struct Peer {
297 known_statements: LruHashSet<Hash>,
299 role: ObservedRole,
300}
301
302impl<N, S> StatementHandler<N, S>
303where
304 N: NetworkPeers + NetworkEventStream,
305 S: SyncEventStream + sp_consensus::SyncOracle,
306{
307 pub async fn run(mut self) {
310 loop {
311 futures::select! {
312 _ = self.propagate_timeout.next() => {
313 self.propagate_statements().await;
314 self.metrics.as_ref().map(|metrics| {
315 metrics.pending_statements.set(self.pending_statements.len() as u64);
316 });
317 },
318 (hash, result) = self.pending_statements.select_next_some() => {
319 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
320 if let Some(result) = result {
321 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
322 }
323 } else {
324 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
325 }
326 },
327 sync_event = self.sync_event_stream.next() => {
328 if let Some(sync_event) = sync_event {
329 self.handle_sync_event(sync_event);
330 } else {
331 return;
333 }
334 }
335 event = self.notification_service.next_event().fuse() => {
336 if let Some(event) = event {
337 self.handle_notification_event(event)
338 } else {
339 return
341 }
342 }
343 }
344 }
345 }
346
347 fn handle_sync_event(&mut self, event: SyncEvent) {
348 match event {
349 SyncEvent::PeerConnected(remote) => {
350 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
351 .collect::<multiaddr::Multiaddr>();
352 let result = self.network.add_peers_to_reserved_set(
353 self.protocol_name.clone(),
354 iter::once(addr).collect(),
355 );
356 if let Err(err) = result {
357 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
358 }
359 },
360 SyncEvent::PeerDisconnected(remote) => {
361 let result = self.network.remove_peers_from_reserved_set(
362 self.protocol_name.clone(),
363 iter::once(remote).collect(),
364 );
365 if let Err(err) = result {
366 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
367 }
368 },
369 }
370 }
371
372 fn handle_notification_event(&mut self, event: NotificationEvent) {
373 match event {
374 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
375 let result = self
377 .network
378 .peer_role(peer, handshake)
379 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
380 let _ = result_tx.send(result);
381 },
382 NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
383 let Some(role) = self.network.peer_role(peer, handshake) else {
384 log::debug!(target: LOG_TARGET, "role for {peer} couldn't be determined");
385 return
386 };
387
388 let _was_in = self.peers.insert(
389 peer,
390 Peer {
391 known_statements: LruHashSet::new(
392 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
393 ),
394 role,
395 },
396 );
397 debug_assert!(_was_in.is_none());
398 },
399 NotificationEvent::NotificationStreamClosed { peer } => {
400 let _peer = self.peers.remove(&peer);
401 debug_assert!(_peer.is_some());
402 },
403 NotificationEvent::NotificationReceived { peer, notification } => {
404 if self.sync.is_major_syncing() {
406 log::trace!(
407 target: LOG_TARGET,
408 "{peer}: Ignoring statements while major syncing or offline"
409 );
410 return
411 }
412
413 if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
414 self.on_statements(peer, statements);
415 } else {
416 log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
417 }
418 },
419 }
420 }
421
422 fn on_statements(&mut self, who: PeerId, statements: Statements) {
424 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
425 if let Some(ref mut peer) = self.peers.get_mut(&who) {
426 let mut statements_left = statements.len() as u64;
427 for s in statements {
428 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
429 log::debug!(
430 target: LOG_TARGET,
431 "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
432 statements_left,
433 MAX_PENDING_STATEMENTS,
434 );
435 self.metrics.as_ref().map(|metrics| {
436 metrics.ignored_statements.inc_by(statements_left);
437 });
438 break
439 }
440
441 let hash = s.hash();
442 peer.known_statements.insert(hash);
443
444 if self.statement_store.has_statement(&hash) {
445 self.metrics.as_ref().map(|metrics| {
446 metrics.known_statements_received.inc();
447 });
448
449 if let Some(peers) = self.pending_statements_peers.get(&hash) {
450 if peers.contains(&who) {
451 log::trace!(
452 target: LOG_TARGET,
453 "Already received the statement from the same peer {who}.",
454 );
455 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
456 }
457 }
458 continue;
459 }
460
461 self.network.report_peer(who, rep::ANY_STATEMENT);
462
463 match self.pending_statements_peers.entry(hash) {
464 Entry::Vacant(entry) => {
465 let (completion_sender, completion_receiver) = oneshot::channel();
466 match self.queue_sender.try_send((s, completion_sender)) {
467 Ok(()) => {
468 self.pending_statements.push(
469 async move {
470 let res = completion_receiver.await;
471 (hash, res.ok())
472 }
473 .boxed(),
474 );
475 entry.insert(HashSet::from_iter([who]));
476 },
477 Err(async_channel::TrySendError::Full(_)) => {
478 log::debug!(
479 target: LOG_TARGET,
480 "Dropped statement because validation channel is full",
481 );
482 },
483 Err(async_channel::TrySendError::Closed(_)) => {
484 log::trace!(
485 target: LOG_TARGET,
486 "Dropped statement because validation channel is closed",
487 );
488 },
489 }
490 },
491 Entry::Occupied(mut entry) => {
492 if !entry.get_mut().insert(who) {
493 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
495 }
496 },
497 }
498
499 statements_left -= 1;
500 }
501 }
502 }
503
504 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
505 match import {
506 SubmitResult::New(NetworkPriority::High) =>
507 self.network.report_peer(who, rep::EXCELLENT_STATEMENT),
508 SubmitResult::New(NetworkPriority::Low) =>
509 self.network.report_peer(who, rep::GOOD_STATEMENT),
510 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
511 SubmitResult::KnownExpired => {},
512 SubmitResult::Ignored => {},
513 SubmitResult::Bad(_) => self.network.report_peer(who, rep::BAD_STATEMENT),
514 SubmitResult::InternalError(_) => {},
515 }
516 }
517
518 pub async fn propagate_statement(&mut self, hash: &Hash) {
520 if self.sync.is_major_syncing() {
522 return
523 }
524
525 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
526 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
527 self.do_propagate_statements(&[(*hash, statement)]).await;
528 }
529 }
530
531 async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
532 log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
533 for (who, peer) in self.peers.iter_mut() {
534 log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
535
536 if peer.role.is_light() {
538 log::trace!(target: LOG_TARGET, "{} is a light node, skipping propagation", who);
539 continue
540 }
541
542 let to_send = statements
543 .iter()
544 .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
545 .collect::<Vec<_>>();
546 log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
547
548 let mut offset = 0;
549 while offset < to_send.len() {
550 let mut current_end = to_send.len();
552 log::trace!(target: LOG_TARGET, "Looking for better chunk size");
553
554 loop {
555 let chunk = &to_send[offset..current_end];
556 let encoded_size = chunk.encoded_size();
557 log::trace!(target: LOG_TARGET, "Chunk: {} statements, {} KB", chunk.len(), encoded_size / 1024);
558
559 if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize {
561 if let Err(e) = timeout(
562 SEND_TIMEOUT,
563 self.notification_service.send_async_notification(who, chunk.encode()),
564 )
565 .await
566 {
567 log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected, skipping further batches: {:?}", who, e);
568 offset = to_send.len();
569 break;
570 }
571 offset = current_end;
572 log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset);
573 self.metrics.as_ref().map(|metrics| {
574 metrics.propagated_statements.inc_by(chunk.len() as u64);
575 metrics.propagated_statements_chunks.observe(chunk.len() as f64);
576 });
577 break;
578 }
579
580 let split_factor =
582 (encoded_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1;
583 let mut new_chunk_size = (current_end - offset) / split_factor;
584
585 if new_chunk_size == 0 {
587 if chunk.len() == 1 {
588 log::warn!(target: LOG_TARGET, "Statement too large ({} KB), skipping", encoded_size / 1024);
589 self.metrics.as_ref().map(|metrics| {
590 metrics.skipped_oversized_statements.inc();
591 });
592 offset = current_end;
593 break;
594 }
595 new_chunk_size = 1;
597 }
598
599 current_end = offset + new_chunk_size;
601 }
602 }
603 }
604 log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
605 }
606
607 async fn propagate_statements(&mut self) {
609 if self.sync.is_major_syncing() {
611 return
612 }
613
614 let Ok(statements) = self.statement_store.take_recent_statements() else { return };
615 if !statements.is_empty() {
616 self.do_propagate_statements(&statements).await;
617 }
618 }
619}
620
621#[cfg(test)]
622mod tests {
623
624 use super::*;
625 use std::sync::Mutex;
626
627 #[derive(Clone)]
628 struct TestNetwork {
629 reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
630 }
631
632 impl TestNetwork {
633 fn new() -> Self {
634 Self { reported_peers: Arc::new(Mutex::new(Vec::new())) }
635 }
636
637 fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
638 self.reported_peers.lock().unwrap().clone()
639 }
640 }
641
642 #[async_trait::async_trait]
643 impl NetworkPeers for TestNetwork {
644 fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
645 unimplemented!()
646 }
647
648 fn set_authorized_only(&self, _: bool) {
649 unimplemented!()
650 }
651
652 fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
653 unimplemented!()
654 }
655
656 fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
657 self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
658 }
659
660 fn peer_reputation(&self, _: &PeerId) -> i32 {
661 unimplemented!()
662 }
663
664 fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {
665 unimplemented!()
666 }
667
668 fn accept_unreserved_peers(&self) {
669 unimplemented!()
670 }
671
672 fn deny_unreserved_peers(&self) {
673 unimplemented!()
674 }
675
676 fn add_reserved_peer(
677 &self,
678 _: sc_network::config::MultiaddrWithPeerId,
679 ) -> Result<(), String> {
680 unimplemented!()
681 }
682
683 fn remove_reserved_peer(&self, _: PeerId) {
684 unimplemented!()
685 }
686
687 fn set_reserved_peers(
688 &self,
689 _: sc_network::ProtocolName,
690 _: std::collections::HashSet<sc_network::Multiaddr>,
691 ) -> Result<(), String> {
692 unimplemented!()
693 }
694
695 fn add_peers_to_reserved_set(
696 &self,
697 _: sc_network::ProtocolName,
698 _: std::collections::HashSet<sc_network::Multiaddr>,
699 ) -> Result<(), String> {
700 unimplemented!()
701 }
702
703 fn remove_peers_from_reserved_set(
704 &self,
705 _: sc_network::ProtocolName,
706 _: Vec<PeerId>,
707 ) -> Result<(), String> {
708 unimplemented!()
709 }
710
711 fn sync_num_connected(&self) -> usize {
712 unimplemented!()
713 }
714
715 fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
716 unimplemented!()
717 }
718
719 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
720 unimplemented!();
721 }
722 }
723
724 struct TestSync {}
725
726 impl SyncEventStream for TestSync {
727 fn event_stream(
728 &self,
729 _name: &'static str,
730 ) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
731 unimplemented!()
732 }
733 }
734
735 impl sp_consensus::SyncOracle for TestSync {
736 fn is_major_syncing(&self) -> bool {
737 false
738 }
739
740 fn is_offline(&self) -> bool {
741 unimplemented!()
742 }
743 }
744
745 impl NetworkEventStream for TestNetwork {
746 fn event_stream(
747 &self,
748 _name: &'static str,
749 ) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
750 unimplemented!()
751 }
752 }
753
754 #[derive(Debug, Clone)]
755 struct TestNotificationService {
756 sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
757 }
758
759 impl TestNotificationService {
760 fn new() -> Self {
761 Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
762 }
763
764 fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
765 self.sent_notifications.lock().unwrap().clone()
766 }
767 }
768
769 #[async_trait::async_trait]
770 impl NotificationService for TestNotificationService {
771 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
772 unimplemented!()
773 }
774
775 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
776 unimplemented!()
777 }
778
779 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
780 self.sent_notifications.lock().unwrap().push((*peer, notification));
781 }
782
783 async fn send_async_notification(
784 &mut self,
785 peer: &PeerId,
786 notification: Vec<u8>,
787 ) -> Result<(), sc_network::error::Error> {
788 self.sent_notifications.lock().unwrap().push((*peer, notification));
789 Ok(())
790 }
791
792 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
793 unimplemented!()
794 }
795
796 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
797 unimplemented!()
798 }
799
800 async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
801 None
802 }
803
804 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
805 unimplemented!()
806 }
807
808 fn protocol(&self) -> &sc_network::types::ProtocolName {
809 unimplemented!()
810 }
811
812 fn message_sink(
813 &self,
814 _peer: &PeerId,
815 ) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
816 unimplemented!()
817 }
818 }
819
820 #[derive(Clone)]
821 struct TestStatementStore {
822 statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
823 recent_statements:
824 Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
825 }
826
827 impl TestStatementStore {
828 fn new() -> Self {
829 Self { statements: Default::default(), recent_statements: Default::default() }
830 }
831 }
832
833 impl StatementStore for TestStatementStore {
834 fn statements(
835 &self,
836 ) -> sp_statement_store::Result<
837 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
838 > {
839 Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
840 }
841
842 fn take_recent_statements(
843 &self,
844 ) -> sp_statement_store::Result<
845 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
846 > {
847 Ok(self.recent_statements.lock().unwrap().drain().collect())
848 }
849
850 fn statement(
851 &self,
852 _hash: &sp_statement_store::Hash,
853 ) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
854 unimplemented!()
855 }
856
857 fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
858 self.statements.lock().unwrap().contains_key(hash)
859 }
860
861 fn broadcasts(
862 &self,
863 _match_all_topics: &[sp_statement_store::Topic],
864 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
865 unimplemented!()
866 }
867
868 fn posted(
869 &self,
870 _match_all_topics: &[sp_statement_store::Topic],
871 _dest: [u8; 32],
872 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
873 unimplemented!()
874 }
875
876 fn posted_clear(
877 &self,
878 _match_all_topics: &[sp_statement_store::Topic],
879 _dest: [u8; 32],
880 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
881 unimplemented!()
882 }
883
884 fn broadcasts_stmt(
885 &self,
886 _match_all_topics: &[sp_statement_store::Topic],
887 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
888 unimplemented!()
889 }
890
891 fn posted_stmt(
892 &self,
893 _match_all_topics: &[sp_statement_store::Topic],
894 _dest: [u8; 32],
895 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
896 unimplemented!()
897 }
898
899 fn posted_clear_stmt(
900 &self,
901 _match_all_topics: &[sp_statement_store::Topic],
902 _dest: [u8; 32],
903 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
904 unimplemented!()
905 }
906
907 fn submit(
908 &self,
909 _statement: sp_statement_store::Statement,
910 _source: sp_statement_store::StatementSource,
911 ) -> sp_statement_store::SubmitResult {
912 unimplemented!()
913 }
914
915 fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
916 unimplemented!()
917 }
918
919 fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
920 unimplemented!()
921 }
922 }
923
924 fn build_handler() -> (
925 StatementHandler<TestNetwork, TestSync>,
926 TestStatementStore,
927 TestNetwork,
928 TestNotificationService,
929 async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
930 ) {
931 let statement_store = TestStatementStore::new();
932 let (queue_sender, queue_receiver) = async_channel::bounded(2);
933 let network = TestNetwork::new();
934 let notification_service = TestNotificationService::new();
935 let peer_id = PeerId::random();
936 let mut peers = HashMap::new();
937 peers.insert(
938 peer_id,
939 Peer {
940 known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
941 role: ObservedRole::Full,
942 },
943 );
944
945 let handler = StatementHandler {
946 protocol_name: "/statement/1".into(),
947 notification_service: Box::new(notification_service.clone()),
948 propagate_timeout: (Box::pin(futures::stream::pending())
949 as Pin<Box<dyn Stream<Item = ()> + Send>>)
950 .fuse(),
951 pending_statements: FuturesUnordered::new(),
952 pending_statements_peers: HashMap::new(),
953 network: network.clone(),
954 sync: TestSync {},
955 sync_event_stream: (Box::pin(futures::stream::pending())
956 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
957 .fuse(),
958 peers,
959 statement_store: Arc::new(statement_store.clone()),
960 queue_sender,
961 metrics: None,
962 };
963 (handler, statement_store, network, notification_service, queue_receiver)
964 }
965
966 #[test]
967 fn test_skips_processing_statements_that_already_in_store() {
968 let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
969 build_handler();
970
971 let mut statement1 = Statement::new();
972 statement1.set_plain_data(b"statement1".to_vec());
973 let hash1 = statement1.hash();
974
975 statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
976
977 let mut statement2 = Statement::new();
978 statement2.set_plain_data(b"statement2".to_vec());
979 let hash2 = statement2.hash();
980
981 let peer_id = *handler.peers.keys().next().unwrap();
982
983 handler.on_statements(peer_id, vec![statement1, statement2]);
984
985 let to_submit = queue_receiver.try_recv();
986 assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
987
988 let no_more = queue_receiver.try_recv();
989 assert!(no_more.is_err(), "Expected only one statement to be queued");
990 }
991
992 #[test]
993 fn test_reports_for_duplicate_statements() {
994 let (mut handler, statement_store, network, _notification_service, queue_receiver) =
995 build_handler();
996
997 let peer_id = *handler.peers.keys().next().unwrap();
998
999 let mut statement1 = Statement::new();
1000 statement1.set_plain_data(b"statement1".to_vec());
1001
1002 handler.on_statements(peer_id, vec![statement1.clone()]);
1003 {
1004 let (s, _) = queue_receiver.try_recv().unwrap();
1006 let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1007 handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1008 }
1009
1010 handler.on_statements(peer_id, vec![statement1]);
1011
1012 let reports = network.get_reports();
1013 assert_eq!(
1014 reports,
1015 vec![
1016 (peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
1020 "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1021 reports
1022 );
1023 }
1024
1025 #[tokio::test]
1026 async fn test_splits_large_batches_into_smaller_chunks() {
1027 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1028 build_handler();
1029
1030 let num_statements = 30;
1031 let statement_size = 100 * 1024; for i in 0..num_statements {
1033 let mut statement = Statement::new();
1034 let mut data = vec![0u8; statement_size];
1035 data[0] = i as u8;
1036 statement.set_plain_data(data);
1037 let hash = statement.hash();
1038 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1039 }
1040
1041 handler.propagate_statements().await;
1042
1043 let sent = notification_service.get_sent_notifications();
1044 let mut total_statements_sent = 0;
1045 assert!(
1046 sent.len() == 3,
1047 "Expected batch to be split into 3 chunks, but got {} chunks",
1048 sent.len()
1049 );
1050 for (_peer, notification) in sent.iter() {
1051 assert!(
1052 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1053 "Notification size {} exceeds limit {}",
1054 notification.len(),
1055 MAX_STATEMENT_NOTIFICATION_SIZE
1056 );
1057 if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1058 total_statements_sent += stmts.len();
1059 }
1060 }
1061
1062 assert_eq!(
1063 total_statements_sent, num_statements,
1064 "Expected all {} statements to be sent, but only {} were sent",
1065 num_statements, total_statements_sent
1066 );
1067 }
1068
1069 #[tokio::test]
1070 async fn test_skips_only_oversized_statements() {
1071 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1072 build_handler();
1073
1074 let mut statement1 = Statement::new();
1075 statement1.set_plain_data(vec![1u8; 100]);
1076 let hash1 = statement1.hash();
1077 statement_store
1078 .recent_statements
1079 .lock()
1080 .unwrap()
1081 .insert(hash1, statement1.clone());
1082
1083 let mut oversized1 = Statement::new();
1084 oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1085 let hash_oversized1 = oversized1.hash();
1086 statement_store
1087 .recent_statements
1088 .lock()
1089 .unwrap()
1090 .insert(hash_oversized1, oversized1);
1091
1092 let mut statement2 = Statement::new();
1093 statement2.set_plain_data(vec![3u8; 100]);
1094 let hash2 = statement2.hash();
1095 statement_store
1096 .recent_statements
1097 .lock()
1098 .unwrap()
1099 .insert(hash2, statement2.clone());
1100
1101 let mut oversized2 = Statement::new();
1102 oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1103 let hash_oversized2 = oversized2.hash();
1104 statement_store
1105 .recent_statements
1106 .lock()
1107 .unwrap()
1108 .insert(hash_oversized2, oversized2);
1109
1110 let mut statement3 = Statement::new();
1111 statement3.set_plain_data(vec![5u8; 100]);
1112 let hash3 = statement3.hash();
1113 statement_store
1114 .recent_statements
1115 .lock()
1116 .unwrap()
1117 .insert(hash3, statement3.clone());
1118
1119 handler.propagate_statements().await;
1120
1121 let sent = notification_service.get_sent_notifications();
1122
1123 let mut sent_hashes = sent
1124 .iter()
1125 .flat_map(|(_peer, notification)| {
1126 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1127 })
1128 .map(|s| s.hash())
1129 .collect::<Vec<_>>();
1130 sent_hashes.sort();
1131 let mut expected_hashes = vec![hash1, hash2, hash3];
1132 expected_hashes.sort();
1133 assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1134 }
1135}