sc_network_statement/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Statement handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
25//!   configuration as an extra peers set.
26//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
27//! `Future` that processes statements.
28
29use crate::config::*;
30
31use codec::{Decode, Encode};
32use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt};
33use prometheus_endpoint::{
34	prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64,
35};
36use sc_network::{
37	config::{NonReservedPeerMode, SetConfig},
38	error, multiaddr,
39	peer_store::PeerStoreProvider,
40	service::{
41		traits::{NotificationEvent, NotificationService, ValidationResult},
42		NotificationMetrics,
43	},
44	types::ProtocolName,
45	utils::{interval, LruHashSet},
46	NetworkBackend, NetworkEventStream, NetworkPeers,
47};
48use sc_network_common::role::ObservedRole;
49use sc_network_sync::{SyncEvent, SyncEventStream};
50use sc_network_types::PeerId;
51use sp_runtime::traits::Block as BlockT;
52use sp_statement_store::{
53	Hash, NetworkPriority, Statement, StatementSource, StatementStore, SubmitResult,
54};
55use std::{
56	collections::{hash_map::Entry, HashMap, HashSet},
57	iter,
58	num::NonZeroUsize,
59	pin::Pin,
60	sync::Arc,
61};
62use tokio::time::timeout;
63
64pub mod config;
65
66/// A set of statements.
67pub type Statements = Vec<Statement>;
68/// Future resolving to statement import result.
69pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
70
71mod rep {
72	use sc_network::ReputationChange as Rep;
73	/// Reputation change when a peer sends us any statement.
74	///
75	/// This forces node to verify it, thus the negative value here. Once statement is verified,
76	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
77	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
78	/// Reputation change when a peer sends us any statement that is not invalid.
79	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
80	/// Reputation change when a peer sends us an statement that we didn't know about.
81	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 7, "Good statement");
82	/// Reputation change when a peer sends us a bad statement.
83	pub const BAD_STATEMENT: Rep = Rep::new(-(1 << 12), "Bad statement");
84	/// Reputation change when a peer sends us a duplicate statement.
85	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
86	/// Reputation change when a peer sends us particularly useful statement
87	pub const EXCELLENT_STATEMENT: Rep = Rep::new(1 << 8, "High priority statement");
88}
89
90const LOG_TARGET: &str = "statement-gossip";
91/// Maximim time we wait for sending a notification to a peer.
92const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
93
94struct Metrics {
95	propagated_statements: Counter<U64>,
96	known_statements_received: Counter<U64>,
97	skipped_oversized_statements: Counter<U64>,
98	propagated_statements_chunks: Histogram,
99	pending_statements: Gauge<U64>,
100	ignored_statements: Counter<U64>,
101}
102
103impl Metrics {
104	fn register(r: &Registry) -> Result<Self, PrometheusError> {
105		Ok(Self {
106			propagated_statements: register(
107				Counter::new(
108					"substrate_sync_propagated_statements",
109					"Number of statements propagated to at least one peer",
110				)?,
111				r,
112			)?,
113			known_statements_received: register(
114				Counter::new(
115					"substrate_sync_known_statement_received",
116					"Number of statements received via gossiping that were already in the statement store",
117				)?,
118				r,
119			)?,
120			skipped_oversized_statements: register(
121				Counter::new(
122					"substrate_sync_skipped_oversized_statements",
123					"Number of oversized statements that were skipped to be gossiped",
124				)?,
125				r,
126			)?,
127			propagated_statements_chunks: register(
128				Histogram::with_opts(
129					HistogramOpts::new(
130						"substrate_sync_propagated_statements_chunks",
131						"Distribution of chunk sizes when propagating statements",
132					).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?),
133				)?,
134				r,
135			)?,
136			pending_statements: register(
137				Gauge::new(
138					"substrate_sync_pending_statement_validations",
139					"Number of pending statement validations",
140				)?,
141				r,
142			)?,
143			ignored_statements: register(
144				Counter::new(
145					"substrate_sync_ignored_statements",
146					"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
147				)?,
148				r,
149			)?,
150		})
151	}
152}
153
154/// Prototype for a [`StatementHandler`].
155pub struct StatementHandlerPrototype {
156	protocol_name: ProtocolName,
157	notification_service: Box<dyn NotificationService>,
158}
159
160impl StatementHandlerPrototype {
161	/// Create a new instance.
162	pub fn new<
163		Hash: AsRef<[u8]>,
164		Block: BlockT,
165		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
166	>(
167		genesis_hash: Hash,
168		fork_id: Option<&str>,
169		metrics: NotificationMetrics,
170		peer_store_handle: Arc<dyn PeerStoreProvider>,
171	) -> (Self, Net::NotificationProtocolConfig) {
172		let genesis_hash = genesis_hash.as_ref();
173		let protocol_name = if let Some(fork_id) = fork_id {
174			format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
175		} else {
176			format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
177		};
178		let (config, notification_service) = Net::notification_config(
179			protocol_name.clone().into(),
180			Vec::new(),
181			MAX_STATEMENT_NOTIFICATION_SIZE,
182			None,
183			SetConfig {
184				in_peers: 0,
185				out_peers: 0,
186				reserved_nodes: Vec::new(),
187				non_reserved_mode: NonReservedPeerMode::Deny,
188			},
189			metrics,
190			peer_store_handle,
191		);
192
193		(Self { protocol_name: protocol_name.into(), notification_service }, config)
194	}
195
196	/// Turns the prototype into the actual handler.
197	///
198	/// Important: the statements handler is initially disabled and doesn't gossip statements.
199	/// Gossiping is enabled when major syncing is done.
200	pub fn build<
201		N: NetworkPeers + NetworkEventStream,
202		S: SyncEventStream + sp_consensus::SyncOracle,
203	>(
204		self,
205		network: N,
206		sync: S,
207		statement_store: Arc<dyn StatementStore>,
208		metrics_registry: Option<&Registry>,
209		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
210	) -> error::Result<StatementHandler<N, S>> {
211		let sync_event_stream = sync.event_stream("statement-handler-sync");
212		let (queue_sender, mut queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
213
214		let store = statement_store.clone();
215		executor(
216			async move {
217				loop {
218					let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
219						queue_receiver.next().await;
220					match task {
221						None => return,
222						Some((statement, completion)) => {
223							let result = store.submit(statement, StatementSource::Network);
224							if completion.send(result).is_err() {
225								log::debug!(
226									target: LOG_TARGET,
227									"Error sending validation completion"
228								);
229							}
230						},
231					}
232				}
233			}
234			.boxed(),
235		);
236
237		let handler = StatementHandler {
238			protocol_name: self.protocol_name,
239			notification_service: self.notification_service,
240			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
241				as Pin<Box<dyn Stream<Item = ()> + Send>>)
242				.fuse(),
243			pending_statements: FuturesUnordered::new(),
244			pending_statements_peers: HashMap::new(),
245			network,
246			sync,
247			sync_event_stream: sync_event_stream.fuse(),
248			peers: HashMap::new(),
249			statement_store,
250			queue_sender,
251			metrics: if let Some(r) = metrics_registry {
252				Some(Metrics::register(r)?)
253			} else {
254				None
255			},
256		};
257
258		Ok(handler)
259	}
260}
261
262/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
263pub struct StatementHandler<
264	N: NetworkPeers + NetworkEventStream,
265	S: SyncEventStream + sp_consensus::SyncOracle,
266> {
267	protocol_name: ProtocolName,
268	/// Interval at which we call `propagate_statements`.
269	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
270	/// Pending statements verification tasks.
271	pending_statements:
272		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
273	/// As multiple peers can send us the same statement, we group
274	/// these peers using the statement hash while the statement is
275	/// imported. This prevents that we import the same statement
276	/// multiple times concurrently.
277	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
278	/// Network service to use to send messages and manage peers.
279	network: N,
280	/// Syncing service.
281	sync: S,
282	/// Receiver for syncing-related events.
283	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
284	/// Notification service.
285	notification_service: Box<dyn NotificationService>,
286	// All connected peers
287	peers: HashMap<PeerId, Peer>,
288	statement_store: Arc<dyn StatementStore>,
289	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
290	/// Prometheus metrics.
291	metrics: Option<Metrics>,
292}
293
294/// Peer information
295#[derive(Debug)]
296struct Peer {
297	/// Holds a set of statements known to this peer.
298	known_statements: LruHashSet<Hash>,
299	role: ObservedRole,
300}
301
302impl<N, S> StatementHandler<N, S>
303where
304	N: NetworkPeers + NetworkEventStream,
305	S: SyncEventStream + sp_consensus::SyncOracle,
306{
307	/// Turns the [`StatementHandler`] into a future that should run forever and not be
308	/// interrupted.
309	pub async fn run(mut self) {
310		loop {
311			futures::select! {
312				_ = self.propagate_timeout.next() => {
313					self.propagate_statements().await;
314					self.metrics.as_ref().map(|metrics| {
315						metrics.pending_statements.set(self.pending_statements.len() as u64);
316					});
317				},
318				(hash, result) = self.pending_statements.select_next_some() => {
319					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
320						if let Some(result) = result {
321							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
322						}
323					} else {
324						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
325					}
326				},
327				sync_event = self.sync_event_stream.next() => {
328					if let Some(sync_event) = sync_event {
329						self.handle_sync_event(sync_event);
330					} else {
331						// Syncing has seemingly closed. Closing as well.
332						return;
333					}
334				}
335				event = self.notification_service.next_event().fuse() => {
336					if let Some(event) = event {
337						self.handle_notification_event(event)
338					} else {
339						// `Notifications` has seemingly closed. Closing as well.
340						return
341					}
342				}
343			}
344		}
345	}
346
347	fn handle_sync_event(&mut self, event: SyncEvent) {
348		match event {
349			SyncEvent::PeerConnected(remote) => {
350				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
351					.collect::<multiaddr::Multiaddr>();
352				let result = self.network.add_peers_to_reserved_set(
353					self.protocol_name.clone(),
354					iter::once(addr).collect(),
355				);
356				if let Err(err) = result {
357					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
358				}
359			},
360			SyncEvent::PeerDisconnected(remote) => {
361				let result = self.network.remove_peers_from_reserved_set(
362					self.protocol_name.clone(),
363					iter::once(remote).collect(),
364				);
365				if let Err(err) = result {
366					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
367				}
368			},
369		}
370	}
371
372	fn handle_notification_event(&mut self, event: NotificationEvent) {
373		match event {
374			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
375				// only accept peers whose role can be determined
376				let result = self
377					.network
378					.peer_role(peer, handshake)
379					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
380				let _ = result_tx.send(result);
381			},
382			NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
383				let Some(role) = self.network.peer_role(peer, handshake) else {
384					log::debug!(target: LOG_TARGET, "role for {peer} couldn't be determined");
385					return
386				};
387
388				let _was_in = self.peers.insert(
389					peer,
390					Peer {
391						known_statements: LruHashSet::new(
392							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
393						),
394						role,
395					},
396				);
397				debug_assert!(_was_in.is_none());
398			},
399			NotificationEvent::NotificationStreamClosed { peer } => {
400				let _peer = self.peers.remove(&peer);
401				debug_assert!(_peer.is_some());
402			},
403			NotificationEvent::NotificationReceived { peer, notification } => {
404				// Accept statements only when node is not major syncing
405				if self.sync.is_major_syncing() {
406					log::trace!(
407						target: LOG_TARGET,
408						"{peer}: Ignoring statements while major syncing or offline"
409					);
410					return
411				}
412
413				if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
414					self.on_statements(peer, statements);
415				} else {
416					log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
417				}
418			},
419		}
420	}
421
422	/// Called when peer sends us new statements
423	fn on_statements(&mut self, who: PeerId, statements: Statements) {
424		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
425		if let Some(ref mut peer) = self.peers.get_mut(&who) {
426			let mut statements_left = statements.len() as u64;
427			for s in statements {
428				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
429					log::debug!(
430						target: LOG_TARGET,
431						"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
432						statements_left,
433						MAX_PENDING_STATEMENTS,
434					);
435					self.metrics.as_ref().map(|metrics| {
436						metrics.ignored_statements.inc_by(statements_left);
437					});
438					break
439				}
440
441				let hash = s.hash();
442				peer.known_statements.insert(hash);
443
444				if self.statement_store.has_statement(&hash) {
445					self.metrics.as_ref().map(|metrics| {
446						metrics.known_statements_received.inc();
447					});
448
449					if let Some(peers) = self.pending_statements_peers.get(&hash) {
450						if peers.contains(&who) {
451							log::trace!(
452								target: LOG_TARGET,
453								"Already received the statement from the same peer {who}.",
454							);
455							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
456						}
457					}
458					continue;
459				}
460
461				self.network.report_peer(who, rep::ANY_STATEMENT);
462
463				match self.pending_statements_peers.entry(hash) {
464					Entry::Vacant(entry) => {
465						let (completion_sender, completion_receiver) = oneshot::channel();
466						match self.queue_sender.try_send((s, completion_sender)) {
467							Ok(()) => {
468								self.pending_statements.push(
469									async move {
470										let res = completion_receiver.await;
471										(hash, res.ok())
472									}
473									.boxed(),
474								);
475								entry.insert(HashSet::from_iter([who]));
476							},
477							Err(async_channel::TrySendError::Full(_)) => {
478								log::debug!(
479									target: LOG_TARGET,
480									"Dropped statement because validation channel is full",
481								);
482							},
483							Err(async_channel::TrySendError::Closed(_)) => {
484								log::trace!(
485									target: LOG_TARGET,
486									"Dropped statement because validation channel is closed",
487								);
488							},
489						}
490					},
491					Entry::Occupied(mut entry) => {
492						if !entry.get_mut().insert(who) {
493							// Already received this from the same peer.
494							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
495						}
496					},
497				}
498
499				statements_left -= 1;
500			}
501		}
502	}
503
504	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
505		match import {
506			SubmitResult::New(NetworkPriority::High) =>
507				self.network.report_peer(who, rep::EXCELLENT_STATEMENT),
508			SubmitResult::New(NetworkPriority::Low) =>
509				self.network.report_peer(who, rep::GOOD_STATEMENT),
510			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
511			SubmitResult::KnownExpired => {},
512			SubmitResult::Ignored => {},
513			SubmitResult::Bad(_) => self.network.report_peer(who, rep::BAD_STATEMENT),
514			SubmitResult::InternalError(_) => {},
515		}
516	}
517
518	/// Propagate one statement.
519	pub async fn propagate_statement(&mut self, hash: &Hash) {
520		// Accept statements only when node is not major syncing
521		if self.sync.is_major_syncing() {
522			return
523		}
524
525		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
526		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
527			self.do_propagate_statements(&[(*hash, statement)]).await;
528		}
529	}
530
531	async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
532		log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
533		for (who, peer) in self.peers.iter_mut() {
534			log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
535
536			// never send statements to light nodes
537			if peer.role.is_light() {
538				log::trace!(target: LOG_TARGET, "{} is a light node, skipping propagation", who);
539				continue
540			}
541
542			let to_send = statements
543				.iter()
544				.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
545				.collect::<Vec<_>>();
546			log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
547
548			let mut offset = 0;
549			while offset < to_send.len() {
550				// Try to send as many statements as possible in one notification
551				let mut current_end = to_send.len();
552				log::trace!(target: LOG_TARGET, "Looking for better chunk size");
553
554				loop {
555					let chunk = &to_send[offset..current_end];
556					let encoded_size = chunk.encoded_size();
557					log::trace!(target: LOG_TARGET, "Chunk: {} statements, {} KB", chunk.len(), encoded_size / 1024);
558
559					// If chunk fits, send it
560					if encoded_size <= MAX_STATEMENT_NOTIFICATION_SIZE as usize {
561						if let Err(e) = timeout(
562							SEND_TIMEOUT,
563							self.notification_service.send_async_notification(who, chunk.encode()),
564						)
565						.await
566						{
567							log::debug!(target: LOG_TARGET, "Failed to send notification to {}, peer disconnected, skipping further batches: {:?}", who, e);
568							offset = to_send.len();
569							break;
570						}
571						offset = current_end;
572						log::trace!(target: LOG_TARGET, "Sent {} statements ({} KB) to {}, {} left", chunk.len(), encoded_size / 1024, who, to_send.len() - offset);
573						self.metrics.as_ref().map(|metrics| {
574							metrics.propagated_statements.inc_by(chunk.len() as u64);
575							metrics.propagated_statements_chunks.observe(chunk.len() as f64);
576						});
577						break;
578					}
579
580					// Size exceeded - split the chunk
581					let split_factor =
582						(encoded_size / MAX_STATEMENT_NOTIFICATION_SIZE as usize) + 1;
583					let mut new_chunk_size = (current_end - offset) / split_factor;
584
585					// Single statement is too large
586					if new_chunk_size == 0 {
587						if chunk.len() == 1 {
588							log::warn!(target: LOG_TARGET, "Statement too large ({} KB), skipping", encoded_size / 1024);
589							self.metrics.as_ref().map(|metrics| {
590								metrics.skipped_oversized_statements.inc();
591							});
592							offset = current_end;
593							break;
594						}
595						// Don't skip more than one statement at once
596						new_chunk_size = 1;
597					}
598
599					// Reduce chunk size and try again
600					current_end = offset + new_chunk_size;
601				}
602			}
603		}
604		log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
605	}
606
607	/// Call when we must propagate ready statements to peers.
608	async fn propagate_statements(&mut self) {
609		// Send out statements only when node is not major syncing
610		if self.sync.is_major_syncing() {
611			return
612		}
613
614		let Ok(statements) = self.statement_store.take_recent_statements() else { return };
615		if !statements.is_empty() {
616			self.do_propagate_statements(&statements).await;
617		}
618	}
619}
620
621#[cfg(test)]
622mod tests {
623
624	use super::*;
625	use std::sync::Mutex;
626
627	#[derive(Clone)]
628	struct TestNetwork {
629		reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
630	}
631
632	impl TestNetwork {
633		fn new() -> Self {
634			Self { reported_peers: Arc::new(Mutex::new(Vec::new())) }
635		}
636
637		fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
638			self.reported_peers.lock().unwrap().clone()
639		}
640	}
641
642	#[async_trait::async_trait]
643	impl NetworkPeers for TestNetwork {
644		fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
645			unimplemented!()
646		}
647
648		fn set_authorized_only(&self, _: bool) {
649			unimplemented!()
650		}
651
652		fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
653			unimplemented!()
654		}
655
656		fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
657			self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
658		}
659
660		fn peer_reputation(&self, _: &PeerId) -> i32 {
661			unimplemented!()
662		}
663
664		fn disconnect_peer(&self, _: PeerId, _: sc_network::ProtocolName) {
665			unimplemented!()
666		}
667
668		fn accept_unreserved_peers(&self) {
669			unimplemented!()
670		}
671
672		fn deny_unreserved_peers(&self) {
673			unimplemented!()
674		}
675
676		fn add_reserved_peer(
677			&self,
678			_: sc_network::config::MultiaddrWithPeerId,
679		) -> Result<(), String> {
680			unimplemented!()
681		}
682
683		fn remove_reserved_peer(&self, _: PeerId) {
684			unimplemented!()
685		}
686
687		fn set_reserved_peers(
688			&self,
689			_: sc_network::ProtocolName,
690			_: std::collections::HashSet<sc_network::Multiaddr>,
691		) -> Result<(), String> {
692			unimplemented!()
693		}
694
695		fn add_peers_to_reserved_set(
696			&self,
697			_: sc_network::ProtocolName,
698			_: std::collections::HashSet<sc_network::Multiaddr>,
699		) -> Result<(), String> {
700			unimplemented!()
701		}
702
703		fn remove_peers_from_reserved_set(
704			&self,
705			_: sc_network::ProtocolName,
706			_: Vec<PeerId>,
707		) -> Result<(), String> {
708			unimplemented!()
709		}
710
711		fn sync_num_connected(&self) -> usize {
712			unimplemented!()
713		}
714
715		fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
716			unimplemented!()
717		}
718
719		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
720			unimplemented!();
721		}
722	}
723
724	struct TestSync {}
725
726	impl SyncEventStream for TestSync {
727		fn event_stream(
728			&self,
729			_name: &'static str,
730		) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
731			unimplemented!()
732		}
733	}
734
735	impl sp_consensus::SyncOracle for TestSync {
736		fn is_major_syncing(&self) -> bool {
737			false
738		}
739
740		fn is_offline(&self) -> bool {
741			unimplemented!()
742		}
743	}
744
745	impl NetworkEventStream for TestNetwork {
746		fn event_stream(
747			&self,
748			_name: &'static str,
749		) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
750			unimplemented!()
751		}
752	}
753
754	#[derive(Debug, Clone)]
755	struct TestNotificationService {
756		sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
757	}
758
759	impl TestNotificationService {
760		fn new() -> Self {
761			Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
762		}
763
764		fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
765			self.sent_notifications.lock().unwrap().clone()
766		}
767	}
768
769	#[async_trait::async_trait]
770	impl NotificationService for TestNotificationService {
771		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
772			unimplemented!()
773		}
774
775		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
776			unimplemented!()
777		}
778
779		fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
780			self.sent_notifications.lock().unwrap().push((*peer, notification));
781		}
782
783		async fn send_async_notification(
784			&mut self,
785			peer: &PeerId,
786			notification: Vec<u8>,
787		) -> Result<(), sc_network::error::Error> {
788			self.sent_notifications.lock().unwrap().push((*peer, notification));
789			Ok(())
790		}
791
792		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
793			unimplemented!()
794		}
795
796		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
797			unimplemented!()
798		}
799
800		async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
801			None
802		}
803
804		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
805			unimplemented!()
806		}
807
808		fn protocol(&self) -> &sc_network::types::ProtocolName {
809			unimplemented!()
810		}
811
812		fn message_sink(
813			&self,
814			_peer: &PeerId,
815		) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
816			unimplemented!()
817		}
818	}
819
820	#[derive(Clone)]
821	struct TestStatementStore {
822		statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
823		recent_statements:
824			Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
825	}
826
827	impl TestStatementStore {
828		fn new() -> Self {
829			Self { statements: Default::default(), recent_statements: Default::default() }
830		}
831	}
832
833	impl StatementStore for TestStatementStore {
834		fn statements(
835			&self,
836		) -> sp_statement_store::Result<
837			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
838		> {
839			Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
840		}
841
842		fn take_recent_statements(
843			&self,
844		) -> sp_statement_store::Result<
845			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
846		> {
847			Ok(self.recent_statements.lock().unwrap().drain().collect())
848		}
849
850		fn statement(
851			&self,
852			_hash: &sp_statement_store::Hash,
853		) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
854			unimplemented!()
855		}
856
857		fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
858			self.statements.lock().unwrap().contains_key(hash)
859		}
860
861		fn broadcasts(
862			&self,
863			_match_all_topics: &[sp_statement_store::Topic],
864		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
865			unimplemented!()
866		}
867
868		fn posted(
869			&self,
870			_match_all_topics: &[sp_statement_store::Topic],
871			_dest: [u8; 32],
872		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
873			unimplemented!()
874		}
875
876		fn posted_clear(
877			&self,
878			_match_all_topics: &[sp_statement_store::Topic],
879			_dest: [u8; 32],
880		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
881			unimplemented!()
882		}
883
884		fn broadcasts_stmt(
885			&self,
886			_match_all_topics: &[sp_statement_store::Topic],
887		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
888			unimplemented!()
889		}
890
891		fn posted_stmt(
892			&self,
893			_match_all_topics: &[sp_statement_store::Topic],
894			_dest: [u8; 32],
895		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
896			unimplemented!()
897		}
898
899		fn posted_clear_stmt(
900			&self,
901			_match_all_topics: &[sp_statement_store::Topic],
902			_dest: [u8; 32],
903		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
904			unimplemented!()
905		}
906
907		fn submit(
908			&self,
909			_statement: sp_statement_store::Statement,
910			_source: sp_statement_store::StatementSource,
911		) -> sp_statement_store::SubmitResult {
912			unimplemented!()
913		}
914
915		fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
916			unimplemented!()
917		}
918
919		fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
920			unimplemented!()
921		}
922	}
923
924	fn build_handler() -> (
925		StatementHandler<TestNetwork, TestSync>,
926		TestStatementStore,
927		TestNetwork,
928		TestNotificationService,
929		async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
930	) {
931		let statement_store = TestStatementStore::new();
932		let (queue_sender, queue_receiver) = async_channel::bounded(2);
933		let network = TestNetwork::new();
934		let notification_service = TestNotificationService::new();
935		let peer_id = PeerId::random();
936		let mut peers = HashMap::new();
937		peers.insert(
938			peer_id,
939			Peer {
940				known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
941				role: ObservedRole::Full,
942			},
943		);
944
945		let handler = StatementHandler {
946			protocol_name: "/statement/1".into(),
947			notification_service: Box::new(notification_service.clone()),
948			propagate_timeout: (Box::pin(futures::stream::pending())
949				as Pin<Box<dyn Stream<Item = ()> + Send>>)
950				.fuse(),
951			pending_statements: FuturesUnordered::new(),
952			pending_statements_peers: HashMap::new(),
953			network: network.clone(),
954			sync: TestSync {},
955			sync_event_stream: (Box::pin(futures::stream::pending())
956				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
957				.fuse(),
958			peers,
959			statement_store: Arc::new(statement_store.clone()),
960			queue_sender,
961			metrics: None,
962		};
963		(handler, statement_store, network, notification_service, queue_receiver)
964	}
965
966	#[test]
967	fn test_skips_processing_statements_that_already_in_store() {
968		let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
969			build_handler();
970
971		let mut statement1 = Statement::new();
972		statement1.set_plain_data(b"statement1".to_vec());
973		let hash1 = statement1.hash();
974
975		statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
976
977		let mut statement2 = Statement::new();
978		statement2.set_plain_data(b"statement2".to_vec());
979		let hash2 = statement2.hash();
980
981		let peer_id = *handler.peers.keys().next().unwrap();
982
983		handler.on_statements(peer_id, vec![statement1, statement2]);
984
985		let to_submit = queue_receiver.try_recv();
986		assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
987
988		let no_more = queue_receiver.try_recv();
989		assert!(no_more.is_err(), "Expected only one statement to be queued");
990	}
991
992	#[test]
993	fn test_reports_for_duplicate_statements() {
994		let (mut handler, statement_store, network, _notification_service, queue_receiver) =
995			build_handler();
996
997		let peer_id = *handler.peers.keys().next().unwrap();
998
999		let mut statement1 = Statement::new();
1000		statement1.set_plain_data(b"statement1".to_vec());
1001
1002		handler.on_statements(peer_id, vec![statement1.clone()]);
1003		{
1004			// Manually process statements submission
1005			let (s, _) = queue_receiver.try_recv().unwrap();
1006			let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1007			handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1008		}
1009
1010		handler.on_statements(peer_id, vec![statement1]);
1011
1012		let reports = network.get_reports();
1013		assert_eq!(
1014			reports,
1015			vec![
1016				(peer_id, rep::ANY_STATEMENT),        // Report for first statement
1017				(peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement
1018				(peer_id, rep::DUPLICATE_STATEMENT)   // Report for duplicate statement
1019			],
1020			"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1021			reports
1022		);
1023	}
1024
1025	#[tokio::test]
1026	async fn test_splits_large_batches_into_smaller_chunks() {
1027		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1028			build_handler();
1029
1030		let num_statements = 30;
1031		let statement_size = 100 * 1024; // 100KB per statement
1032		for i in 0..num_statements {
1033			let mut statement = Statement::new();
1034			let mut data = vec![0u8; statement_size];
1035			data[0] = i as u8;
1036			statement.set_plain_data(data);
1037			let hash = statement.hash();
1038			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1039		}
1040
1041		handler.propagate_statements().await;
1042
1043		let sent = notification_service.get_sent_notifications();
1044		let mut total_statements_sent = 0;
1045		assert!(
1046			sent.len() == 3,
1047			"Expected batch to be split into 3 chunks, but got {} chunks",
1048			sent.len()
1049		);
1050		for (_peer, notification) in sent.iter() {
1051			assert!(
1052				notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1053				"Notification size {} exceeds limit {}",
1054				notification.len(),
1055				MAX_STATEMENT_NOTIFICATION_SIZE
1056			);
1057			if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1058				total_statements_sent += stmts.len();
1059			}
1060		}
1061
1062		assert_eq!(
1063			total_statements_sent, num_statements,
1064			"Expected all {} statements to be sent, but only {} were sent",
1065			num_statements, total_statements_sent
1066		);
1067	}
1068
1069	#[tokio::test]
1070	async fn test_skips_only_oversized_statements() {
1071		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1072			build_handler();
1073
1074		let mut statement1 = Statement::new();
1075		statement1.set_plain_data(vec![1u8; 100]);
1076		let hash1 = statement1.hash();
1077		statement_store
1078			.recent_statements
1079			.lock()
1080			.unwrap()
1081			.insert(hash1, statement1.clone());
1082
1083		let mut oversized1 = Statement::new();
1084		oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1085		let hash_oversized1 = oversized1.hash();
1086		statement_store
1087			.recent_statements
1088			.lock()
1089			.unwrap()
1090			.insert(hash_oversized1, oversized1);
1091
1092		let mut statement2 = Statement::new();
1093		statement2.set_plain_data(vec![3u8; 100]);
1094		let hash2 = statement2.hash();
1095		statement_store
1096			.recent_statements
1097			.lock()
1098			.unwrap()
1099			.insert(hash2, statement2.clone());
1100
1101		let mut oversized2 = Statement::new();
1102		oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1103		let hash_oversized2 = oversized2.hash();
1104		statement_store
1105			.recent_statements
1106			.lock()
1107			.unwrap()
1108			.insert(hash_oversized2, oversized2);
1109
1110		let mut statement3 = Statement::new();
1111		statement3.set_plain_data(vec![5u8; 100]);
1112		let hash3 = statement3.hash();
1113		statement_store
1114			.recent_statements
1115			.lock()
1116			.unwrap()
1117			.insert(hash3, statement3.clone());
1118
1119		handler.propagate_statements().await;
1120
1121		let sent = notification_service.get_sent_notifications();
1122
1123		let mut sent_hashes = sent
1124			.iter()
1125			.flat_map(|(_peer, notification)| {
1126				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1127			})
1128			.map(|s| s.hash())
1129			.collect::<Vec<_>>();
1130		sent_hashes.sort();
1131		let mut expected_hashes = vec![hash1, hash2, hash3];
1132		expected_hashes.sort();
1133		assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1134	}
1135}