Skip to main content

sc_network_statement/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Statement handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
25//!   configuration as an extra peers set.
26//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
27//! `Future` that processes statements.
28
29use crate::config::*;
30
31use codec::{Compact, Decode, Encode, MaxEncodedLen};
32#[cfg(any(test, feature = "test-helpers"))]
33use futures::future::pending;
34use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
35use governor::{
36	clock::DefaultClock,
37	state::{InMemoryState, NotKeyed},
38	Quota, RateLimiter,
39};
40use prometheus_endpoint::{
41	exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
42	Registry, U64,
43};
44use sc_network::{
45	config::{NonReservedPeerMode, SetConfig},
46	error, multiaddr,
47	peer_store::PeerStoreProvider,
48	service::{
49		traits::{NotificationEvent, NotificationService, ValidationResult},
50		NotificationMetrics,
51	},
52	types::ProtocolName,
53	utils::{interval, LruHashSet},
54	NetworkBackend, NetworkEventStream, NetworkPeers,
55};
56use sc_network_sync::{SyncEvent, SyncEventStream};
57use sc_network_types::PeerId;
58use sp_runtime::traits::Block as BlockT;
59use sp_statement_store::{
60	FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
61};
62use std::{
63	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
64	iter,
65	num::{NonZeroU32, NonZeroUsize},
66	pin::Pin,
67	sync::Arc,
68	time::Instant,
69};
70use tokio::time::timeout;
71pub mod config;
72
73/// A set of statements.
74pub type Statements = Vec<Statement>;
75/// Future resolving to statement import result.
76pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
77
78mod rep {
79	use sc_network::ReputationChange as Rep;
80	/// Reputation change when a peer sends us any statement.
81	///
82	/// This forces node to verify it, thus the negative value here. Once statement is verified,
83	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
84	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
85	/// Reputation change when a peer sends us any statement that is not invalid.
86	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
87	/// Reputation change when a peer sends us an statement that we didn't know about.
88	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
89	/// Reputation change when a peer sends us an invalid statement.
90	pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
91	/// Reputation change when a peer sends us a duplicate statement.
92	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
93	/// Reputation change when a peer floods us with statements.
94	pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
95}
96
97const LOG_TARGET: &str = "statement-gossip";
98/// Maximim time we wait for sending a notification to a peer.
99const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
100/// Interval for sending statement batches during initial sync to new peers.
101const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
102
103struct Metrics {
104	propagated_statements: Counter<U64>,
105	known_statements_received: Counter<U64>,
106	skipped_oversized_statements: Counter<U64>,
107	propagated_statements_chunks: Histogram,
108	pending_statements: Gauge<U64>,
109	ignored_statements: Counter<U64>,
110	peers_connected: Gauge<U64>,
111	statements_received: Counter<U64>,
112	bytes_sent_total: Counter<U64>,
113	bytes_received_total: Counter<U64>,
114	sent_latency_seconds: Histogram,
115	initial_sync_statements_sent: Counter<U64>,
116	initial_sync_bursts_total: Counter<U64>,
117	initial_sync_peers_active: Gauge<U64>,
118	initial_sync_duration_seconds: Histogram,
119	statement_flooding_detected: Counter<U64>,
120}
121
122impl Metrics {
123	fn register(r: &Registry) -> Result<Self, PrometheusError> {
124		Ok(Self {
125			propagated_statements: register(
126				Counter::new(
127					"substrate_sync_propagated_statements",
128					"Number of statements propagated to at least one peer",
129				)?,
130				r,
131			)?,
132			known_statements_received: register(
133				Counter::new(
134					"substrate_sync_known_statement_received",
135					"Number of statements received via gossiping that were already in the statement store",
136				)?,
137				r,
138			)?,
139			skipped_oversized_statements: register(
140				Counter::new(
141					"substrate_sync_skipped_oversized_statements",
142					"Number of oversized statements that were skipped to be gossiped",
143				)?,
144				r,
145			)?,
146			propagated_statements_chunks: register(
147				Histogram::with_opts(
148					HistogramOpts::new(
149						"substrate_sync_propagated_statements_chunks",
150						"Distribution of chunk sizes when propagating statements",
151					)
152					.buckets(exponential_buckets(1.0, 2.0, 14)?),
153				)?,
154				r,
155			)?,
156			pending_statements: register(
157				Gauge::new(
158					"substrate_sync_pending_statement_validations",
159					"Number of pending statement validations",
160				)?,
161				r,
162			)?,
163			ignored_statements: register(
164				Counter::new(
165					"substrate_sync_ignored_statements",
166					"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
167				)?,
168				r,
169			)?,
170			peers_connected: register(
171				Gauge::new(
172					"substrate_sync_statement_peers_connected",
173					"Number of peers connected using the statement protocol",
174				)?,
175				r,
176			)?,
177			statements_received: register(
178				Counter::new(
179					"substrate_sync_statements_received",
180					"Total number of statements received from peers",
181				)?,
182				r,
183			)?,
184			bytes_sent_total: register(
185				Counter::new(
186					"substrate_sync_statement_bytes_sent_total",
187					"Total bytes sent for statement protocol messages",
188				)?,
189				r,
190			)?,
191			bytes_received_total: register(
192				Counter::new(
193					"substrate_sync_statement_bytes_received_total",
194					"Total bytes received for statement protocol messages",
195				)?,
196				r,
197			)?,
198			sent_latency_seconds: register(
199				Histogram::with_opts(
200					HistogramOpts::new(
201						"substrate_sync_statement_sent_latency_seconds",
202						"Time to send statement messages to peers",
203					)
204					// Buckets from 1μs to ~1s covering microsecond to millisecond range.
205					.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
206				)?,
207				r,
208			)?,
209			initial_sync_statements_sent: register(
210				Counter::new(
211					"substrate_sync_initial_sync_statements_sent",
212					"Total statements sent during initial sync bursts to newly connected peers",
213				)?,
214				r,
215			)?,
216			initial_sync_bursts_total: register(
217				Counter::new(
218					"substrate_sync_initial_sync_bursts_total",
219					"Total number of initial sync burst rounds processed",
220				)?,
221				r,
222			)?,
223			initial_sync_peers_active: register(
224				Gauge::new(
225					"substrate_sync_initial_sync_peers_active",
226					"Number of peers currently being synced via initial sync",
227				)?,
228				r,
229			)?,
230			initial_sync_duration_seconds: register(
231				Histogram::with_opts(
232					HistogramOpts::new(
233						"substrate_sync_initial_sync_duration_seconds",
234						"Per-peer total duration of initial sync from start to completion",
235					)
236					.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
237				)?,
238				r,
239			)?,
240			statement_flooding_detected: register(
241				Counter::new(
242					"substrate_sync_statement_flooding_detected",
243					"Number of peers disconnected for exceeding statement rate limits",
244				)?,
245				r,
246			)?,
247		})
248	}
249}
250
251/// Prototype for a [`StatementHandler`].
252pub struct StatementHandlerPrototype {
253	protocol_name: ProtocolName,
254	notification_service: Box<dyn NotificationService>,
255}
256
257impl StatementHandlerPrototype {
258	/// Create a new instance.
259	pub fn new<
260		Hash: AsRef<[u8]>,
261		Block: BlockT,
262		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
263	>(
264		genesis_hash: Hash,
265		fork_id: Option<&str>,
266		metrics: NotificationMetrics,
267		peer_store_handle: Arc<dyn PeerStoreProvider>,
268	) -> (Self, Net::NotificationProtocolConfig) {
269		let genesis_hash = genesis_hash.as_ref();
270		let protocol_name = if let Some(fork_id) = fork_id {
271			format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
272		} else {
273			format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
274		};
275		let (config, notification_service) = Net::notification_config(
276			protocol_name.clone().into(),
277			Vec::new(),
278			MAX_STATEMENT_NOTIFICATION_SIZE,
279			None,
280			SetConfig {
281				in_peers: 0,
282				out_peers: 0,
283				reserved_nodes: Vec::new(),
284				non_reserved_mode: NonReservedPeerMode::Deny,
285			},
286			metrics,
287			peer_store_handle,
288		);
289
290		(Self { protocol_name: protocol_name.into(), notification_service }, config)
291	}
292
293	/// Turns the prototype into the actual handler.
294	///
295	/// Important: the statements handler is initially disabled and doesn't gossip statements.
296	/// Gossiping is enabled when major syncing is done.
297	pub fn build<
298		N: NetworkPeers + NetworkEventStream,
299		S: SyncEventStream + sp_consensus::SyncOracle,
300	>(
301		self,
302		network: N,
303		sync: S,
304		statement_store: Arc<dyn StatementStore>,
305		metrics_registry: Option<&Registry>,
306		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
307		mut num_submission_workers: usize,
308		statements_per_second: u32,
309	) -> error::Result<StatementHandler<N, S>> {
310		let sync_event_stream = sync.event_stream("statement-handler-sync");
311		let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
312
313		if num_submission_workers == 0 {
314			log::warn!(
315				target: LOG_TARGET,
316				"num_submission_workers is 0, defaulting to 1"
317			);
318			num_submission_workers = 1;
319		}
320
321		let statements_per_second = match NonZeroU32::new(statements_per_second) {
322			Some(rate) => rate,
323			None => {
324				log::warn!(
325					target: LOG_TARGET,
326					"statements_per_second is 0, defaulting to {}",
327					DEFAULT_STATEMENTS_PER_SECOND
328				);
329				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
330					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
331			},
332		};
333
334		let metrics =
335			if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
336
337		for _ in 0..num_submission_workers {
338			let store = statement_store.clone();
339			let mut queue_receiver = queue_receiver.clone();
340			executor(
341				async move {
342					loop {
343						let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
344							queue_receiver.next().await;
345						match task {
346							None => return,
347							Some((statement, completion)) => {
348								let result = store.submit(statement, StatementSource::Network);
349								if completion.send(result).is_err() {
350									log::debug!(
351										target: LOG_TARGET,
352										"Error sending validation completion"
353									);
354								}
355							},
356						}
357					}
358				}
359				.boxed(),
360			);
361		}
362
363		let handler = StatementHandler {
364			protocol_name: self.protocol_name,
365			notification_service: self.notification_service,
366			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
367				as Pin<Box<dyn Stream<Item = ()> + Send>>)
368				.fuse(),
369			pending_statements: FuturesUnordered::new(),
370			pending_statements_peers: HashMap::new(),
371			network,
372			sync,
373			sync_event_stream: sync_event_stream.fuse(),
374			peers: HashMap::new(),
375			statement_store,
376			queue_sender,
377			statements_per_second,
378			metrics,
379			initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
380			pending_initial_syncs: HashMap::new(),
381			initial_sync_peer_queue: VecDeque::new(),
382		};
383
384		Ok(handler)
385	}
386}
387
388/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
389pub struct StatementHandler<
390	N: NetworkPeers + NetworkEventStream,
391	S: SyncEventStream + sp_consensus::SyncOracle,
392> {
393	protocol_name: ProtocolName,
394	/// Interval at which we call `propagate_statements`.
395	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
396	/// Pending statements verification tasks.
397	pending_statements:
398		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
399	/// As multiple peers can send us the same statement, we group
400	/// these peers using the statement hash while the statement is
401	/// imported. This prevents that we import the same statement
402	/// multiple times concurrently.
403	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
404	/// Network service to use to send messages and manage peers.
405	network: N,
406	/// Syncing service.
407	sync: S,
408	/// Receiver for syncing-related events.
409	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
410	/// Notification service.
411	notification_service: Box<dyn NotificationService>,
412	// All connected peers
413	peers: HashMap<PeerId, Peer>,
414	statement_store: Arc<dyn StatementStore>,
415	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
416	/// Maximum statements per second per peer.
417	statements_per_second: NonZeroU32,
418	/// Prometheus metrics.
419	metrics: Option<Metrics>,
420	/// Timeout for sending next statement batch during initial sync.
421	initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
422	/// Pending initial syncs per peer.
423	pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
424	/// Queue for round-robin processing of initial syncs.
425	initial_sync_peer_queue: VecDeque<PeerId>,
426}
427
428/// Per-peer rate limiter using a token bucket algorithm.
429///
430/// The token bucket allows short bursts up to the per-second limit while enforcing
431/// the average rate over time.
432#[derive(Debug)]
433struct PeerRateLimiter {
434	limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
435}
436
437impl PeerRateLimiter {
438	fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
439		let quota = Quota::per_second(statements_per_second).allow_burst(burst);
440		Self { limiter: RateLimiter::direct(quota) }
441	}
442
443	/// Check if receiving `count` statements would exceed the rate limit.
444	fn is_flooding(&self, count: usize) -> bool {
445		if count > u32::MAX as usize {
446			return true;
447		}
448
449		let Some(n) = NonZeroU32::new(count as u32) else {
450			return false;
451		};
452		!matches!(self.limiter.check_n(n), Ok(Ok(())))
453	}
454}
455
456/// Peer information
457#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
458#[derive(Debug)]
459pub struct Peer {
460	/// Holds a set of statements known to this peer.
461	known_statements: LruHashSet<Hash>,
462	/// Rate limiter for statement flooding protection.
463	rate_limiter: PeerRateLimiter,
464}
465
466/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
467struct PendingInitialSync {
468	hashes: Vec<Hash>,
469	started_at: Instant,
470}
471
472/// Result of finding a sendable chunk of statements.
473enum ChunkResult {
474	/// Found a chunk that fits. Contains the end index (exclusive).
475	Send(usize),
476	/// First statement is oversized, skip it.
477	SkipOversized,
478}
479
480/// Result of sending a chunk of statements.
481enum SendChunkResult {
482	/// Successfully sent a chunk of N statements.
483	Sent(usize),
484	/// First statement was oversized and skipped.
485	Skipped,
486	/// Nothing to send.
487	Empty,
488	/// Send failed.
489	Failed,
490}
491
492/// Returns the maximum payload size for statement notifications.
493///
494/// This reserves space for encoding the length of the vector (Compact<u32>),
495/// ensuring the final encoded message fits within MAX_STATEMENT_NOTIFICATION_SIZE.
496fn max_statement_payload_size() -> usize {
497	MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
498}
499
500/// Find the largest chunk of statements starting from the beginning that fits
501/// within MAX_STATEMENT_NOTIFICATION_SIZE.
502///
503/// Uses an incremental approach: adds statements one by one until the limit is reached.
504/// This is efficient because we only compute sizes for statements we'll actually send
505/// in this chunk, rather than computing sizes for all statements upfront.
506fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
507	if statements.is_empty() {
508		return ChunkResult::Send(0);
509	}
510	let max_size = max_statement_payload_size();
511
512	// Incrementally add statements until we exceed the limit.
513	// This is efficient because we only compute sizes for statements in this chunk.
514	// accumulated_size is the sum of encoded sizes of all statements so far (without vec
515	// overhead).
516	let mut accumulated_size = 0;
517	let mut count = 0usize;
518
519	for stmt in &statements[0..] {
520		let stmt_size = stmt.encoded_size();
521		let new_count = count + 1;
522		// Compact encoding overhead for the new count
523		let new_total = accumulated_size + stmt_size;
524		if new_total > max_size {
525			break;
526		}
527
528		accumulated_size += stmt_size;
529		count = new_count;
530	}
531
532	// If we couldn't fit even a single statement, skip it.
533	if count == 0 {
534		ChunkResult::SkipOversized
535	} else {
536		ChunkResult::Send(count)
537	}
538}
539
540impl Peer {
541	/// Create a new peer for testing/benchmarking purposes.
542	#[cfg(any(test, feature = "test-helpers"))]
543	pub fn new_for_testing(
544		known_statements: LruHashSet<Hash>,
545		statements_per_second: NonZeroU32,
546		burst: NonZeroU32,
547	) -> Self {
548		Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
549	}
550}
551
552impl<N, S> StatementHandler<N, S>
553where
554	N: NetworkPeers + NetworkEventStream,
555	S: SyncEventStream + sp_consensus::SyncOracle,
556{
557	/// Create a new `StatementHandler` for testing/benchmarking purposes.
558	#[cfg(any(test, feature = "test-helpers"))]
559	pub fn new_for_testing(
560		protocol_name: ProtocolName,
561		notification_service: Box<dyn NotificationService>,
562		propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
563		network: N,
564		sync: S,
565		sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
566		peers: HashMap<PeerId, Peer>,
567		statement_store: Arc<dyn StatementStore>,
568		queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
569		statements_per_second: NonZeroU32,
570	) -> Self {
571		Self {
572			protocol_name,
573			notification_service,
574			propagate_timeout,
575			pending_statements: FuturesUnordered::new(),
576			pending_statements_peers: HashMap::new(),
577			network,
578			sync,
579			sync_event_stream,
580			peers,
581			statement_store,
582			queue_sender,
583			statements_per_second,
584			metrics: None,
585			initial_sync_timeout: Box::pin(pending().fuse()),
586			pending_initial_syncs: HashMap::new(),
587			initial_sync_peer_queue: VecDeque::new(),
588		}
589	}
590
591	/// Get mutable access to pending statements for testing/benchmarking.
592	#[cfg(any(test, feature = "test-helpers"))]
593	pub fn pending_statements_mut(
594		&mut self,
595	) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
596	{
597		&mut self.pending_statements
598	}
599
600	/// Turns the [`StatementHandler`] into a future that should run forever and not be
601	/// interrupted.
602	pub async fn run(mut self) {
603		loop {
604			futures::select_biased! {
605				_ = self.propagate_timeout.next() => {
606					self.propagate_statements().await;
607					self.metrics.as_ref().map(|metrics| {
608						metrics.pending_statements.set(self.pending_statements.len() as u64);
609					});
610				},
611				(hash, result) = self.pending_statements.select_next_some() => {
612					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
613						if let Some(result) = result {
614							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
615						}
616					} else {
617						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
618					}
619				},
620				sync_event = self.sync_event_stream.next() => {
621					if let Some(sync_event) = sync_event {
622						self.handle_sync_event(sync_event);
623					} else {
624						// Syncing has seemingly closed. Closing as well.
625						return;
626					}
627				}
628				event = self.notification_service.next_event().fuse() => {
629					if let Some(event) = event {
630						self.handle_notification_event(event).await
631					} else {
632						// `Notifications` has seemingly closed. Closing as well.
633						return
634					}
635				}
636				_ = &mut self.initial_sync_timeout => {
637					self.process_initial_sync_burst().await;
638					self.initial_sync_timeout =
639						Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
640				},
641			}
642		}
643	}
644
645	/// Send a single chunk of statements to a peer.
646	async fn send_statement_chunk(
647		&mut self,
648		peer: &PeerId,
649		statements: &[&Statement],
650	) -> SendChunkResult {
651		match find_sendable_chunk(statements) {
652			ChunkResult::Send(0) => SendChunkResult::Empty,
653			ChunkResult::Send(chunk_end) => {
654				let chunk = &statements[..chunk_end];
655				let encoded = chunk.encode();
656				let bytes_to_send = encoded.len() as u64;
657
658				let sent_latency_timer =
659					self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
660				let send_result = timeout(
661					SEND_TIMEOUT,
662					self.notification_service.send_async_notification(peer, encoded),
663				)
664				.await;
665				drop(sent_latency_timer);
666
667				if let Err(e) = send_result {
668					log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
669					return SendChunkResult::Failed;
670				}
671
672				log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
673				self.metrics.as_ref().map(|metrics| {
674					metrics.propagated_statements.inc_by(chunk.len() as u64);
675					metrics.bytes_sent_total.inc_by(bytes_to_send);
676					metrics.propagated_statements_chunks.observe(chunk.len() as f64);
677				});
678				SendChunkResult::Sent(chunk_end)
679			},
680			ChunkResult::SkipOversized => {
681				log::warn!(target: LOG_TARGET, "Statement too large, skipping");
682				self.metrics.as_ref().map(|metrics| {
683					metrics.skipped_oversized_statements.inc();
684				});
685				SendChunkResult::Skipped
686			},
687		}
688	}
689
690	fn handle_sync_event(&mut self, event: SyncEvent) {
691		match event {
692			SyncEvent::PeerConnected(remote) => {
693				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
694					.collect::<multiaddr::Multiaddr>();
695				let result = self.network.add_peers_to_reserved_set(
696					self.protocol_name.clone(),
697					iter::once(addr).collect(),
698				);
699				if let Err(err) = result {
700					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
701				}
702			},
703			SyncEvent::PeerDisconnected(remote) => {
704				let result = self.network.remove_peers_from_reserved_set(
705					self.protocol_name.clone(),
706					iter::once(remote).collect(),
707				);
708				if let Err(err) = result {
709					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
710				}
711			},
712		}
713	}
714
715	async fn handle_notification_event(&mut self, event: NotificationEvent) {
716		match event {
717			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
718				// Only accept peers whose role can be determined
719				let result = self
720					.network
721					.peer_role(peer, handshake)
722					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
723				let _ = result_tx.send(result);
724			},
725			NotificationEvent::NotificationStreamOpened { peer, .. } => {
726				let _was_in = self.peers.insert(
727					peer,
728					Peer {
729						known_statements: LruHashSet::new(
730							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
731						),
732						rate_limiter: PeerRateLimiter::new(
733							self.statements_per_second,
734							NonZeroU32::new(
735								self.statements_per_second.get() *
736									config::STATEMENTS_BURST_COEFFICIENT,
737							)
738							.expect("burst capacity is nonzero"),
739						),
740					},
741				);
742				debug_assert!(_was_in.is_none());
743
744				self.metrics.as_ref().map(|metrics| {
745					metrics.peers_connected.set(self.peers.len() as u64);
746				});
747
748				if !self.sync.is_major_syncing() {
749					let hashes = self.statement_store.statement_hashes();
750					if !hashes.is_empty() {
751						self.pending_initial_syncs.insert(
752							peer,
753							PendingInitialSync { hashes, started_at: Instant::now() },
754						);
755						self.initial_sync_peer_queue.push_back(peer);
756						self.metrics.as_ref().map(|metrics| {
757							metrics.initial_sync_peers_active.inc();
758						});
759					}
760				}
761			},
762			NotificationEvent::NotificationStreamClosed { peer } => {
763				let _peer = self.peers.remove(&peer);
764				debug_assert!(_peer.is_some());
765				if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
766					self.metrics.as_ref().map(|metrics| {
767						metrics.initial_sync_peers_active.dec();
768						metrics
769							.initial_sync_duration_seconds
770							.observe(pending.started_at.elapsed().as_secs_f64());
771					});
772				}
773				self.initial_sync_peer_queue.retain(|p| *p != peer);
774				self.metrics.as_ref().map(|metrics| {
775					metrics.peers_connected.set(self.peers.len() as u64);
776				});
777			},
778			NotificationEvent::NotificationReceived { peer, notification } => {
779				let bytes_received = notification.len() as u64;
780				self.metrics.as_ref().map(|metrics| {
781					metrics.bytes_received_total.inc_by(bytes_received);
782				});
783
784				// Accept statements only when node is not major syncing
785				if self.sync.is_major_syncing() {
786					log::trace!(
787						target: LOG_TARGET,
788						"{peer}: Ignoring statements while major syncing or offline"
789					);
790					return;
791				}
792
793				if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
794					self.on_statements(peer, statements);
795				} else {
796					log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
797				}
798			},
799		}
800	}
801
802	/// Called when peer sends us new statements
803	#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
804	pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
805		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
806
807		self.metrics.as_ref().map(|metrics| {
808			metrics.statements_received.inc_by(statements.len() as u64);
809		});
810
811		if let Some(ref mut peer) = self.peers.get_mut(&who) {
812			if peer.rate_limiter.is_flooding(statements.len()) {
813				log::warn!(
814					target: LOG_TARGET,
815					"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
816					who,
817					self.statements_per_second
818				);
819
820				self.network.report_peer(who, rep::STATEMENT_FLOODING);
821				self.network.disconnect_peer(who, self.protocol_name.clone());
822				if let Some(ref metrics) = self.metrics {
823					metrics.statement_flooding_detected.inc();
824				}
825
826				// Clean up peer state immediately
827				self.peers.remove(&who);
828				self.pending_initial_syncs.remove(&who);
829				self.initial_sync_peer_queue.retain(|p| *p != who);
830
831				return;
832			}
833
834			let mut statements_left = statements.len() as u64;
835			for s in statements {
836				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
837					log::debug!(
838						target: LOG_TARGET,
839						"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
840						statements_left,
841						MAX_PENDING_STATEMENTS,
842					);
843					self.metrics.as_ref().map(|metrics| {
844						metrics.ignored_statements.inc_by(statements_left);
845					});
846					break;
847				}
848
849				let hash = s.hash();
850				peer.known_statements.insert(hash);
851
852				if self.statement_store.has_statement(&hash) {
853					self.metrics.as_ref().map(|metrics| {
854						metrics.known_statements_received.inc();
855					});
856
857					if let Some(peers) = self.pending_statements_peers.get(&hash) {
858						if peers.contains(&who) {
859							log::trace!(
860								target: LOG_TARGET,
861								"Already received the statement from the same peer {who}.",
862							);
863							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
864						}
865					}
866					continue;
867				}
868
869				self.network.report_peer(who, rep::ANY_STATEMENT);
870
871				match self.pending_statements_peers.entry(hash) {
872					Entry::Vacant(entry) => {
873						let (completion_sender, completion_receiver) = oneshot::channel();
874						match self.queue_sender.try_send((s, completion_sender)) {
875							Ok(()) => {
876								self.pending_statements.push(
877									async move {
878										let res = completion_receiver.await;
879										(hash, res.ok())
880									}
881									.boxed(),
882								);
883								entry.insert(HashSet::from_iter([who]));
884							},
885							Err(async_channel::TrySendError::Full(_)) => {
886								log::debug!(
887									target: LOG_TARGET,
888									"Dropped statement because validation channel is full",
889								);
890							},
891							Err(async_channel::TrySendError::Closed(_)) => {
892								log::trace!(
893									target: LOG_TARGET,
894									"Dropped statement because validation channel is closed",
895								);
896							},
897						}
898					},
899					Entry::Occupied(mut entry) => {
900						if !entry.get_mut().insert(who) {
901							// Already received this from the same peer.
902							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
903						}
904					},
905				}
906
907				statements_left -= 1;
908			}
909		}
910	}
911
912	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
913		match import {
914			SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
915			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
916			SubmitResult::KnownExpired => {},
917			SubmitResult::Rejected(_) => {},
918			SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
919			SubmitResult::InternalError(_) => {},
920		}
921	}
922
923	/// Propagate one statement.
924	pub async fn propagate_statement(&mut self, hash: &Hash) {
925		// Accept statements only when node is not major syncing
926		if self.sync.is_major_syncing() {
927			return;
928		}
929
930		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
931		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
932			self.do_propagate_statements(&[(*hash, statement)]).await;
933		}
934	}
935
936	/// Propagate the given `statements` to the given `peer`.
937	///
938	/// Internally filters `statements` to only send unknown statements to the peer.
939	async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
940		let Some(peer) = self.peers.get_mut(who) else {
941			return;
942		};
943
944		let to_send: Vec<_> = statements
945			.iter()
946			.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
947			.collect();
948
949		log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
950
951		if to_send.is_empty() {
952			return;
953		}
954
955		self.send_statements_in_chunks(who, &to_send).await;
956	}
957
958	/// Send statements to a peer in chunks, respecting the maximum notification size.
959	async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
960		let mut offset = 0;
961		while offset < statements.len() {
962			match self.send_statement_chunk(who, &statements[offset..]).await {
963				SendChunkResult::Sent(chunk_end) => {
964					offset += chunk_end;
965				},
966				SendChunkResult::Skipped => {
967					offset += 1;
968				},
969				SendChunkResult::Empty | SendChunkResult::Failed => return,
970			}
971		}
972	}
973
974	async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
975		log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
976		let peers: Vec<_> = self.peers.keys().copied().collect();
977		for who in peers {
978			log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
979			self.send_statements_to_peer(&who, statements).await;
980		}
981		log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
982	}
983
984	/// Call when we must propagate ready statements to peers.
985	async fn propagate_statements(&mut self) {
986		// Send out statements only when node is not major syncing
987		if self.sync.is_major_syncing() {
988			return;
989		}
990
991		let Ok(statements) = self.statement_store.take_recent_statements() else { return };
992		if !statements.is_empty() {
993			self.do_propagate_statements(&statements).await;
994		}
995	}
996
997	/// Record initial sync completion metrics for a peer being removed.
998	fn record_initial_sync_completion(&self, started_at: Instant) {
999		self.metrics.as_ref().map(|metrics| {
1000			metrics.initial_sync_peers_active.dec();
1001			metrics
1002				.initial_sync_duration_seconds
1003				.observe(started_at.elapsed().as_secs_f64());
1004		});
1005	}
1006
1007	/// Process one batch of initial sync for the next peer in the queue (round-robin).
1008	async fn process_initial_sync_burst(&mut self) {
1009		if self.sync.is_major_syncing() {
1010			return;
1011		}
1012
1013		let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1014			return;
1015		};
1016
1017		let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1018			return;
1019		};
1020
1021		self.metrics.as_ref().map(|metrics| {
1022			metrics.initial_sync_bursts_total.inc();
1023		});
1024
1025		if entry.get().hashes.is_empty() {
1026			let started_at = entry.get().started_at;
1027			entry.remove();
1028			self.record_initial_sync_completion(started_at);
1029			return;
1030		}
1031
1032		// Fetch statements up to max_statement_payload_size (reserves space for vec encoding)
1033		let max_size = max_statement_payload_size();
1034		let mut accumulated_size = 0;
1035		let (statements, processed) = match self.statement_store.statements_by_hashes(
1036			&entry.get().hashes,
1037			&mut |_hash, encoded, _stmt| {
1038				if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1039					return FilterDecision::Abort;
1040				}
1041				accumulated_size += encoded.len();
1042				FilterDecision::Take
1043			},
1044		) {
1045			Ok(r) => r,
1046			Err(e) => {
1047				log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1048				let started_at = entry.get().started_at;
1049				entry.remove();
1050				self.record_initial_sync_completion(started_at);
1051				return;
1052			},
1053		};
1054
1055		// Drain processed hashes and check if more remain
1056		entry.get_mut().hashes.drain(..processed);
1057		let has_more = !entry.get().hashes.is_empty();
1058		drop(entry);
1059
1060		// Send statements (already sized to fit in one message)
1061		let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1062		match self.send_statement_chunk(&peer_id, &to_send).await {
1063			SendChunkResult::Failed => {
1064				if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1065					self.record_initial_sync_completion(pending.started_at);
1066				}
1067				return;
1068			},
1069			SendChunkResult::Sent(sent) => {
1070				debug_assert_eq!(to_send.len(), sent);
1071				self.metrics.as_ref().map(|metrics| {
1072					metrics.initial_sync_statements_sent.inc_by(sent as u64);
1073				});
1074				// Mark statements as known
1075				if let Some(peer) = self.peers.get_mut(&peer_id) {
1076					for (hash, _) in &statements {
1077						peer.known_statements.insert(*hash);
1078					}
1079				}
1080			},
1081			SendChunkResult::Empty | SendChunkResult::Skipped => {},
1082		}
1083
1084		// Re-queue if more hashes remain
1085		if has_more {
1086			self.initial_sync_peer_queue.push_back(peer_id);
1087		} else {
1088			if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1089				self.record_initial_sync_completion(pending.started_at);
1090			}
1091		}
1092	}
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097
1098	use super::*;
1099	use std::sync::Mutex;
1100
1101	#[derive(Clone)]
1102	struct TestNetwork {
1103		reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1104		disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1105	}
1106
1107	impl TestNetwork {
1108		fn new() -> Self {
1109			Self {
1110				reported_peers: Arc::new(Mutex::new(Vec::new())),
1111				disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1112			}
1113		}
1114
1115		fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1116			self.reported_peers.lock().unwrap().clone()
1117		}
1118
1119		fn get_disconnected_peers(&self) -> Vec<PeerId> {
1120			self.disconnected_peers.lock().unwrap().clone()
1121		}
1122	}
1123
1124	#[async_trait::async_trait]
1125	impl NetworkPeers for TestNetwork {
1126		fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1127			unimplemented!()
1128		}
1129
1130		fn set_authorized_only(&self, _: bool) {
1131			unimplemented!()
1132		}
1133
1134		fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1135			unimplemented!()
1136		}
1137
1138		fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1139			self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1140		}
1141
1142		fn peer_reputation(&self, _: &PeerId) -> i32 {
1143			unimplemented!()
1144		}
1145
1146		fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1147			self.disconnected_peers.lock().unwrap().push(peer);
1148		}
1149
1150		fn accept_unreserved_peers(&self) {
1151			unimplemented!()
1152		}
1153
1154		fn deny_unreserved_peers(&self) {
1155			unimplemented!()
1156		}
1157
1158		fn add_reserved_peer(
1159			&self,
1160			_: sc_network::config::MultiaddrWithPeerId,
1161		) -> Result<(), String> {
1162			unimplemented!()
1163		}
1164
1165		fn remove_reserved_peer(&self, _: PeerId) {
1166			unimplemented!()
1167		}
1168
1169		fn set_reserved_peers(
1170			&self,
1171			_: sc_network::ProtocolName,
1172			_: std::collections::HashSet<sc_network::Multiaddr>,
1173		) -> Result<(), String> {
1174			unimplemented!()
1175		}
1176
1177		fn add_peers_to_reserved_set(
1178			&self,
1179			_: sc_network::ProtocolName,
1180			_: std::collections::HashSet<sc_network::Multiaddr>,
1181		) -> Result<(), String> {
1182			unimplemented!()
1183		}
1184
1185		fn remove_peers_from_reserved_set(
1186			&self,
1187			_: sc_network::ProtocolName,
1188			_: Vec<PeerId>,
1189		) -> Result<(), String> {
1190			unimplemented!()
1191		}
1192
1193		fn sync_num_connected(&self) -> usize {
1194			unimplemented!()
1195		}
1196
1197		fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1198			unimplemented!()
1199		}
1200
1201		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1202			unimplemented!();
1203		}
1204	}
1205
1206	struct TestSync {}
1207
1208	impl SyncEventStream for TestSync {
1209		fn event_stream(
1210			&self,
1211			_name: &'static str,
1212		) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1213			unimplemented!()
1214		}
1215	}
1216
1217	impl sp_consensus::SyncOracle for TestSync {
1218		fn is_major_syncing(&self) -> bool {
1219			false
1220		}
1221
1222		fn is_offline(&self) -> bool {
1223			unimplemented!()
1224		}
1225	}
1226
1227	impl NetworkEventStream for TestNetwork {
1228		fn event_stream(
1229			&self,
1230			_name: &'static str,
1231		) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1232			unimplemented!()
1233		}
1234	}
1235
1236	#[derive(Debug, Clone)]
1237	struct TestNotificationService {
1238		sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1239	}
1240
1241	impl TestNotificationService {
1242		fn new() -> Self {
1243			Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1244		}
1245
1246		fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1247			self.sent_notifications.lock().unwrap().clone()
1248		}
1249	}
1250
1251	#[async_trait::async_trait]
1252	impl NotificationService for TestNotificationService {
1253		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1254			unimplemented!()
1255		}
1256
1257		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1258			unimplemented!()
1259		}
1260
1261		fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1262			self.sent_notifications.lock().unwrap().push((*peer, notification));
1263		}
1264
1265		async fn send_async_notification(
1266			&mut self,
1267			peer: &PeerId,
1268			notification: Vec<u8>,
1269		) -> Result<(), sc_network::error::Error> {
1270			self.sent_notifications.lock().unwrap().push((*peer, notification));
1271			Ok(())
1272		}
1273
1274		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1275			unimplemented!()
1276		}
1277
1278		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1279			unimplemented!()
1280		}
1281
1282		async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1283			None
1284		}
1285
1286		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1287			unimplemented!()
1288		}
1289
1290		fn protocol(&self) -> &sc_network::types::ProtocolName {
1291			unimplemented!()
1292		}
1293
1294		fn message_sink(
1295			&self,
1296			_peer: &PeerId,
1297		) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1298			unimplemented!()
1299		}
1300	}
1301
1302	#[derive(Clone)]
1303	struct TestStatementStore {
1304		statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1305		recent_statements:
1306			Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1307	}
1308
1309	impl TestStatementStore {
1310		fn new() -> Self {
1311			Self { statements: Default::default(), recent_statements: Default::default() }
1312		}
1313	}
1314
1315	impl StatementStore for TestStatementStore {
1316		fn statements(
1317			&self,
1318		) -> sp_statement_store::Result<
1319			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1320		> {
1321			Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1322		}
1323
1324		fn take_recent_statements(
1325			&self,
1326		) -> sp_statement_store::Result<
1327			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1328		> {
1329			Ok(self.recent_statements.lock().unwrap().drain().collect())
1330		}
1331
1332		fn statement(
1333			&self,
1334			_hash: &sp_statement_store::Hash,
1335		) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1336			unimplemented!()
1337		}
1338
1339		fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1340			self.statements.lock().unwrap().contains_key(hash)
1341		}
1342
1343		fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1344			self.statements.lock().unwrap().keys().cloned().collect()
1345		}
1346
1347		fn statements_by_hashes(
1348			&self,
1349			hashes: &[sp_statement_store::Hash],
1350			filter: &mut dyn FnMut(
1351				&sp_statement_store::Hash,
1352				&[u8],
1353				&sp_statement_store::Statement,
1354			) -> FilterDecision,
1355		) -> sp_statement_store::Result<(
1356			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1357			usize,
1358		)> {
1359			let statements = self.statements.lock().unwrap();
1360			let mut result = Vec::new();
1361			let mut processed = 0;
1362			for hash in hashes {
1363				let Some(stmt) = statements.get(hash) else {
1364					processed += 1;
1365					continue;
1366				};
1367				let encoded = stmt.encode();
1368				match filter(hash, &encoded, stmt) {
1369					FilterDecision::Skip => {
1370						processed += 1;
1371					},
1372					FilterDecision::Take => {
1373						processed += 1;
1374						result.push((*hash, stmt.clone()));
1375					},
1376					FilterDecision::Abort => break,
1377				}
1378			}
1379			Ok((result, processed))
1380		}
1381
1382		fn broadcasts(
1383			&self,
1384			_match_all_topics: &[sp_statement_store::Topic],
1385		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1386			unimplemented!()
1387		}
1388
1389		fn posted(
1390			&self,
1391			_match_all_topics: &[sp_statement_store::Topic],
1392			_dest: [u8; 32],
1393		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1394			unimplemented!()
1395		}
1396
1397		fn posted_clear(
1398			&self,
1399			_match_all_topics: &[sp_statement_store::Topic],
1400			_dest: [u8; 32],
1401		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1402			unimplemented!()
1403		}
1404
1405		fn broadcasts_stmt(
1406			&self,
1407			_match_all_topics: &[sp_statement_store::Topic],
1408		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1409			unimplemented!()
1410		}
1411
1412		fn posted_stmt(
1413			&self,
1414			_match_all_topics: &[sp_statement_store::Topic],
1415			_dest: [u8; 32],
1416		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1417			unimplemented!()
1418		}
1419
1420		fn posted_clear_stmt(
1421			&self,
1422			_match_all_topics: &[sp_statement_store::Topic],
1423			_dest: [u8; 32],
1424		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1425			unimplemented!()
1426		}
1427
1428		fn submit(
1429			&self,
1430			_statement: sp_statement_store::Statement,
1431			_source: sp_statement_store::StatementSource,
1432		) -> sp_statement_store::SubmitResult {
1433			unimplemented!()
1434		}
1435
1436		fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1437			unimplemented!()
1438		}
1439
1440		fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1441			unimplemented!()
1442		}
1443	}
1444
1445	fn build_handler() -> (
1446		StatementHandler<TestNetwork, TestSync>,
1447		TestStatementStore,
1448		TestNetwork,
1449		TestNotificationService,
1450		async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1451	) {
1452		let statement_store = TestStatementStore::new();
1453		let (queue_sender, queue_receiver) = async_channel::bounded(2);
1454		let network = TestNetwork::new();
1455		let notification_service = TestNotificationService::new();
1456		let peer_id = PeerId::random();
1457		let mut peers = HashMap::new();
1458		peers.insert(
1459			peer_id,
1460			Peer {
1461				known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1462				rate_limiter: PeerRateLimiter::new(
1463					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1464						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1465					NonZeroU32::new(
1466						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1467					)
1468					.expect("burst capacity is nonzero"),
1469				),
1470			},
1471		);
1472
1473		let handler = StatementHandler {
1474			protocol_name: "/statement/1".into(),
1475			notification_service: Box::new(notification_service.clone()),
1476			propagate_timeout: (Box::pin(futures::stream::pending())
1477				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1478				.fuse(),
1479			pending_statements: FuturesUnordered::new(),
1480			pending_statements_peers: HashMap::new(),
1481			network: network.clone(),
1482			sync: TestSync {},
1483			sync_event_stream: (Box::pin(futures::stream::pending())
1484				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1485				.fuse(),
1486			peers,
1487			statement_store: Arc::new(statement_store.clone()),
1488			queue_sender,
1489			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1490				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1491			metrics: None,
1492			initial_sync_timeout: Box::pin(futures::future::pending()),
1493			pending_initial_syncs: HashMap::new(),
1494			initial_sync_peer_queue: VecDeque::new(),
1495		};
1496		(handler, statement_store, network, notification_service, queue_receiver)
1497	}
1498
1499	#[tokio::test]
1500	async fn test_skips_processing_statements_that_already_in_store() {
1501		let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1502			build_handler();
1503
1504		let mut statement1 = Statement::new();
1505		statement1.set_plain_data(b"statement1".to_vec());
1506		let hash1 = statement1.hash();
1507
1508		statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1509
1510		let mut statement2 = Statement::new();
1511		statement2.set_plain_data(b"statement2".to_vec());
1512		let hash2 = statement2.hash();
1513
1514		let peer_id = *handler.peers.keys().next().unwrap();
1515
1516		handler.on_statements(peer_id, vec![statement1, statement2]);
1517
1518		let to_submit = queue_receiver.try_recv();
1519		assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1520
1521		let no_more = queue_receiver.try_recv();
1522		assert!(no_more.is_err(), "Expected only one statement to be queued");
1523	}
1524
1525	#[tokio::test]
1526	async fn test_reports_for_duplicate_statements() {
1527		let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1528			build_handler();
1529
1530		let peer_id = *handler.peers.keys().next().unwrap();
1531
1532		let mut statement1 = Statement::new();
1533		statement1.set_plain_data(b"statement1".to_vec());
1534
1535		handler.on_statements(peer_id, vec![statement1.clone()]);
1536		{
1537			// Manually process statements submission
1538			let (s, _) = queue_receiver.try_recv().unwrap();
1539			let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1540			handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1541		}
1542
1543		handler.on_statements(peer_id, vec![statement1]);
1544
1545		let reports = network.get_reports();
1546		assert_eq!(
1547			reports,
1548			vec![
1549				(peer_id, rep::ANY_STATEMENT),        // Report for first statement
1550				(peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement
1551				(peer_id, rep::DUPLICATE_STATEMENT)   // Report for duplicate statement
1552			],
1553			"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1554			reports
1555		);
1556	}
1557
1558	#[tokio::test]
1559	async fn test_splits_large_batches_into_smaller_chunks() {
1560		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1561			build_handler();
1562
1563		let num_statements = 30;
1564		let statement_size = 100 * 1024; // 100KB per statement
1565		for i in 0..num_statements {
1566			let mut statement = Statement::new();
1567			let mut data = vec![0u8; statement_size];
1568			data[0] = i as u8;
1569			statement.set_plain_data(data);
1570			let hash = statement.hash();
1571			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1572		}
1573
1574		handler.propagate_statements().await;
1575
1576		let sent = notification_service.get_sent_notifications();
1577		let mut total_statements_sent = 0;
1578		assert!(
1579			sent.len() == 3,
1580			"Expected batch to be split into 3 chunks, but got {} chunks",
1581			sent.len()
1582		);
1583		for (_peer, notification) in sent.iter() {
1584			assert!(
1585				notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1586				"Notification size {} exceeds limit {}",
1587				notification.len(),
1588				MAX_STATEMENT_NOTIFICATION_SIZE
1589			);
1590			if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1591				total_statements_sent += stmts.len();
1592			}
1593		}
1594
1595		assert_eq!(
1596			total_statements_sent, num_statements,
1597			"Expected all {} statements to be sent, but only {} were sent",
1598			num_statements, total_statements_sent
1599		);
1600	}
1601
1602	#[tokio::test]
1603	async fn test_skips_only_oversized_statements() {
1604		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1605			build_handler();
1606
1607		let mut statement1 = Statement::new();
1608		statement1.set_plain_data(vec![1u8; 100]);
1609		let hash1 = statement1.hash();
1610		statement_store
1611			.recent_statements
1612			.lock()
1613			.unwrap()
1614			.insert(hash1, statement1.clone());
1615
1616		let mut oversized1 = Statement::new();
1617		oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1618		let hash_oversized1 = oversized1.hash();
1619		statement_store
1620			.recent_statements
1621			.lock()
1622			.unwrap()
1623			.insert(hash_oversized1, oversized1);
1624
1625		let mut statement2 = Statement::new();
1626		statement2.set_plain_data(vec![3u8; 100]);
1627		let hash2 = statement2.hash();
1628		statement_store
1629			.recent_statements
1630			.lock()
1631			.unwrap()
1632			.insert(hash2, statement2.clone());
1633
1634		let mut oversized2 = Statement::new();
1635		oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1636		let hash_oversized2 = oversized2.hash();
1637		statement_store
1638			.recent_statements
1639			.lock()
1640			.unwrap()
1641			.insert(hash_oversized2, oversized2);
1642
1643		let mut statement3 = Statement::new();
1644		statement3.set_plain_data(vec![5u8; 100]);
1645		let hash3 = statement3.hash();
1646		statement_store
1647			.recent_statements
1648			.lock()
1649			.unwrap()
1650			.insert(hash3, statement3.clone());
1651
1652		handler.propagate_statements().await;
1653
1654		let sent = notification_service.get_sent_notifications();
1655
1656		let mut sent_hashes = sent
1657			.iter()
1658			.flat_map(|(_peer, notification)| {
1659				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1660			})
1661			.map(|s| s.hash())
1662			.collect::<Vec<_>>();
1663		sent_hashes.sort();
1664		let mut expected_hashes = vec![hash1, hash2, hash3];
1665		expected_hashes.sort();
1666		assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1667	}
1668
1669	fn build_handler_no_peers() -> (
1670		StatementHandler<TestNetwork, TestSync>,
1671		TestStatementStore,
1672		TestNetwork,
1673		TestNotificationService,
1674	) {
1675		let statement_store = TestStatementStore::new();
1676		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1677		let network = TestNetwork::new();
1678		let notification_service = TestNotificationService::new();
1679
1680		let handler = StatementHandler {
1681			protocol_name: "/statement/1".into(),
1682			notification_service: Box::new(notification_service.clone()),
1683			propagate_timeout: (Box::pin(futures::stream::pending())
1684				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1685				.fuse(),
1686			pending_statements: FuturesUnordered::new(),
1687			pending_statements_peers: HashMap::new(),
1688			network: network.clone(),
1689			sync: TestSync {},
1690			sync_event_stream: (Box::pin(futures::stream::pending())
1691				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1692				.fuse(),
1693			peers: HashMap::new(),
1694			statement_store: Arc::new(statement_store.clone()),
1695			queue_sender,
1696			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1697				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1698			metrics: None,
1699			initial_sync_timeout: Box::pin(futures::future::pending()),
1700			pending_initial_syncs: HashMap::new(),
1701			initial_sync_peer_queue: VecDeque::new(),
1702		};
1703		(handler, statement_store, network, notification_service)
1704	}
1705
1706	#[tokio::test]
1707	async fn test_initial_sync_burst_single_peer() {
1708		let (mut handler, statement_store, _network, notification_service) =
1709			build_handler_no_peers();
1710
1711		// Create 20MB of statements (200 statements x 100KB each)
1712		// Using 100KB ensures ~10 statements per 1MB batch, requiring ~20 bursts
1713		let num_statements = 200;
1714		let statement_size = 100 * 1024; // 100KB per statement
1715		let mut expected_hashes = Vec::new();
1716		for i in 0..num_statements {
1717			let mut statement = Statement::new();
1718			let mut data = vec![0u8; statement_size];
1719			// Use multiple bytes for uniqueness since we have >255 statements
1720			data[0] = (i % 256) as u8;
1721			data[1] = (i / 256) as u8;
1722			statement.set_plain_data(data);
1723			let hash = statement.hash();
1724			expected_hashes.push(hash);
1725			statement_store.statements.lock().unwrap().insert(hash, statement);
1726		}
1727
1728		// Setup peer and simulate connection
1729		let peer_id = PeerId::random();
1730
1731		handler
1732			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1733				peer: peer_id,
1734				direction: sc_network::service::traits::Direction::Inbound,
1735				handshake: vec![],
1736				negotiated_fallback: None,
1737			})
1738			.await;
1739
1740		// Verify peer was added and initial sync was queued
1741		assert!(handler.peers.contains_key(&peer_id));
1742		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1743		assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1744
1745		// Process bursts until all statements are sent
1746		let mut burst_count = 0;
1747		while handler.pending_initial_syncs.contains_key(&peer_id) {
1748			handler.process_initial_sync_burst().await;
1749			burst_count += 1;
1750			// Safety limit
1751			assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1752		}
1753
1754		// Verify multiple bursts were needed
1755		// With 200 statements x 100KB each and ~1MB per batch, we expect many bursts
1756		assert!(
1757			burst_count >= 10,
1758			"Expected multiple bursts for 200 statements of 100KB each, got {}",
1759			burst_count
1760		);
1761
1762		// Verify all statements were sent
1763		let sent = notification_service.get_sent_notifications();
1764		let mut sent_hashes: Vec<_> = sent
1765			.iter()
1766			.flat_map(|(peer, notification)| {
1767				assert_eq!(*peer, peer_id);
1768				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1769			})
1770			.map(|s| s.hash())
1771			.collect();
1772		sent_hashes.sort();
1773		expected_hashes.sort();
1774
1775		assert_eq!(
1776			sent_hashes.len(),
1777			expected_hashes.len(),
1778			"Expected {} statements to be sent, got {}",
1779			expected_hashes.len(),
1780			sent_hashes.len()
1781		);
1782		assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1783
1784		// Verify cleanup
1785		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1786		assert!(handler.initial_sync_peer_queue.is_empty());
1787	}
1788
1789	#[tokio::test]
1790	async fn test_initial_sync_burst_multiple_peers_round_robin() {
1791		let (mut handler, statement_store, _network, notification_service) =
1792			build_handler_no_peers();
1793
1794		// Create 20MB of statements (200 statements x 100KB each)
1795		let num_statements = 200;
1796		let statement_size = 100 * 1024; // 100KB per statement
1797		let mut expected_hashes = Vec::new();
1798		for i in 0..num_statements {
1799			let mut statement = Statement::new();
1800			let mut data = vec![0u8; statement_size];
1801			data[0] = (i % 256) as u8;
1802			data[1] = (i / 256) as u8;
1803			statement.set_plain_data(data);
1804			let hash = statement.hash();
1805			expected_hashes.push(hash);
1806			statement_store.statements.lock().unwrap().insert(hash, statement);
1807		}
1808
1809		// Setup 3 peers and simulate connections
1810		let peer1 = PeerId::random();
1811		let peer2 = PeerId::random();
1812		let peer3 = PeerId::random();
1813
1814		// Connect peers
1815		for peer in [peer1, peer2, peer3] {
1816			handler
1817				.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1818					peer,
1819					direction: sc_network::service::traits::Direction::Inbound,
1820					handshake: vec![],
1821					negotiated_fallback: None,
1822				})
1823				.await;
1824		}
1825
1826		// Verify all peers were added and initial syncs were queued
1827		assert_eq!(handler.peers.len(), 3);
1828		assert_eq!(handler.pending_initial_syncs.len(), 3);
1829		assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1830
1831		// Track which peer was processed on each burst for round-robin verification
1832		let mut peer_burst_order = Vec::new();
1833		let mut burst_count = 0;
1834
1835		while !handler.pending_initial_syncs.is_empty() {
1836			// Record which peer will be processed next
1837			if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
1838				peer_burst_order.push(next_peer);
1839			}
1840			handler.process_initial_sync_burst().await;
1841			burst_count += 1;
1842			// Safety limit
1843			assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
1844		}
1845
1846		// Verify multiple bursts were needed
1847		// With 3 peers and many bursts per peer, we expect many bursts total
1848		assert!(
1849			burst_count >= 30,
1850			"Expected many bursts for 3 peers with 200 statements each, got {}",
1851			burst_count
1852		);
1853
1854		// Verify round-robin pattern in first 9 bursts (3 peers x 3 rounds)
1855		assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
1856		// First round
1857		assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
1858		assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
1859		assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
1860		// Second round
1861		assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
1862		assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
1863		assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
1864
1865		// Verify all peers received all statements
1866		let sent = notification_service.get_sent_notifications();
1867		let mut peer1_hashes: Vec<_> = sent
1868			.iter()
1869			.filter(|(peer, _)| *peer == peer1)
1870			.flat_map(|(_, notification)| {
1871				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1872			})
1873			.map(|s| s.hash())
1874			.collect();
1875		let mut peer2_hashes: Vec<_> = sent
1876			.iter()
1877			.filter(|(peer, _)| *peer == peer2)
1878			.flat_map(|(_, notification)| {
1879				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1880			})
1881			.map(|s| s.hash())
1882			.collect();
1883		let mut peer3_hashes: Vec<_> = sent
1884			.iter()
1885			.filter(|(peer, _)| *peer == peer3)
1886			.flat_map(|(_, notification)| {
1887				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1888			})
1889			.map(|s| s.hash())
1890			.collect();
1891
1892		peer1_hashes.sort();
1893		peer2_hashes.sort();
1894		peer3_hashes.sort();
1895		expected_hashes.sort();
1896
1897		assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
1898		assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
1899		assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
1900
1901		// Verify cleanup
1902		assert!(handler.pending_initial_syncs.is_empty());
1903		assert!(handler.initial_sync_peer_queue.is_empty());
1904	}
1905
1906	#[tokio::test]
1907	async fn test_send_statements_in_chunks_exact_max_size() {
1908		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1909			build_handler();
1910
1911		// Calculate the data sizes so that 100 statements together exactly fill max_size.
1912		// This tests that all 100 statements fit in a single notification.
1913		//
1914		// The limit check in find_sendable_chunk is:
1915		//   max_size = MAX_STATEMENT_NOTIFICATION_SIZE - Compact::<u32>::max_encoded_len()
1916		//
1917		// Statement encoding (encodes as Vec<Field>):
1918		// - Compact<u32> for number of fields (1 byte for value 2: expiry + data)
1919		// - Field::Expiry discriminant (1 byte, value 2)
1920		// - u64 expiry value (8 bytes)
1921		// - Field::Data discriminant (1 byte, value 8)
1922		// - Compact<u32> for the data length (2 bytes for small data)
1923		// So per-statement overhead = 1 + 1 + 8 + 1 + 2 = 13 bytes
1924		let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
1925		let num_statements: usize = 100;
1926		let per_statement_overhead = 1 + 1 + 8 + 1 + 2; // Vec<Field> length + expiry field + data discriminant + Compact data length
1927		let total_overhead = per_statement_overhead * num_statements;
1928		let total_data_size = max_size - total_overhead;
1929		let per_statement_data_size = total_data_size / num_statements;
1930		let remainder = total_data_size % num_statements;
1931
1932		let mut expected_hashes = Vec::with_capacity(num_statements);
1933		let mut total_encoded_size = 0;
1934
1935		for i in 0..num_statements {
1936			let mut statement = Statement::new();
1937			// Distribute remainder across first `remainder` statements to exactly fill max_size
1938			let extra = if i < remainder { 1 } else { 0 };
1939			let mut data = vec![42u8; per_statement_data_size + extra];
1940			// Make each statement unique by modifying the first few bytes
1941			data[0] = i as u8;
1942			data[1] = (i >> 8) as u8;
1943			statement.set_plain_data(data);
1944
1945			total_encoded_size += statement.encoded_size();
1946
1947			let hash = statement.hash();
1948			expected_hashes.push(hash);
1949			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1950		}
1951
1952		// Verify our calculation: total encoded size should be <= max_size
1953		assert!(
1954			total_encoded_size == max_size,
1955			"Total encoded size {} should be <= max_size {}",
1956			total_encoded_size,
1957			max_size
1958		);
1959
1960		handler.propagate_statements().await;
1961
1962		let sent = notification_service.get_sent_notifications();
1963
1964		// All statements should fit in a single chunk
1965		assert_eq!(
1966			sent.len(),
1967			1,
1968			"Expected 1 notification for all {} statements, but got {}",
1969			num_statements,
1970			sent.len()
1971		);
1972
1973		let (_peer, notification) = &sent[0];
1974		assert!(
1975			notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1976			"Notification size {} exceeds limit {}",
1977			notification.len(),
1978			MAX_STATEMENT_NOTIFICATION_SIZE
1979		);
1980
1981		let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
1982		assert_eq!(
1983			decoded.len(),
1984			num_statements,
1985			"Expected {} statements in the notification",
1986			num_statements
1987		);
1988
1989		// Verify all statements were sent (order may differ due to HashMap iteration)
1990		let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
1991		expected_hashes.sort();
1992		received_hashes.sort();
1993		assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
1994	}
1995
1996	#[tokio::test]
1997	async fn test_initial_sync_burst_size_limit_consistency() {
1998		// This test verifies that process_initial_sync_burst and find_sendable_chunk
1999		// use the same size limit (max_statement_payload_size).
2000		//
2001		// Previously there was a bug where the filter in process_initial_sync_burst used
2002		// MAX_STATEMENT_NOTIFICATION_SIZE, but find_sendable_chunk reserved extra space
2003		// for Compact::<u32>::max_encoded_len(). This caused a debug_assert failure when
2004		// statements fit the filter but not find_sendable_chunk.
2005		//
2006		// With the fix, both use max_statement_payload_size(), so the filter will reject
2007		// statements that wouldn't fit in find_sendable_chunk.
2008		let (mut handler, statement_store, _network, notification_service) =
2009			build_handler_no_peers();
2010
2011		let payload_limit = max_statement_payload_size();
2012
2013		// Create first statement that's just over half the payload limit
2014		let first_stmt_data_size = payload_limit / 2 + 10;
2015		let mut stmt1 = Statement::new();
2016		stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2017		let stmt1_encoded_size = stmt1.encoded_size();
2018
2019		// Create second statement that, combined with the first, exceeds the payload limit.
2020		// This means the filter will only accept the first statement.
2021		let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2022		let target_stmt2_encoded = remaining + 3; // 3 bytes over limit when combined
2023		let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); // ~4 bytes encoding overhead
2024		let mut stmt2 = Statement::new();
2025		stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2026		let stmt2_encoded_size = stmt2.encoded_size();
2027
2028		let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2029
2030		// Verify our setup: total exceeds payload limit
2031		assert!(
2032			total_encoded > payload_limit,
2033			"Total {} should exceed payload_limit {} so filter rejects second statement",
2034			total_encoded,
2035			payload_limit
2036		);
2037
2038		let hash1 = stmt1.hash();
2039		let hash2 = stmt2.hash();
2040		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2041		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2042
2043		// Setup peer and simulate connection
2044		let peer_id = PeerId::random();
2045
2046		handler
2047			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2048				peer: peer_id,
2049				direction: sc_network::service::traits::Direction::Inbound,
2050				handshake: vec![],
2051				negotiated_fallback: None,
2052			})
2053			.await;
2054
2055		// Verify initial sync was queued with both hashes
2056		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2057		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2058
2059		// Process first burst - should send only one statement (the other doesn't fit)
2060		handler.process_initial_sync_burst().await;
2061
2062		// With the fix, the filter and find_sendable_chunk use the same limit,
2063		// so no assertion failure occurs. Only one statement is fetched and sent.
2064		let sent = notification_service.get_sent_notifications();
2065		assert_eq!(sent.len(), 1, "First burst should send one notification");
2066
2067		let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2068		assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2069
2070		// Verify one of the two statements was sent (order is non-deterministic due to HashMap)
2071		let sent_hash = decoded[0].hash();
2072		assert!(
2073			sent_hash == hash1 || sent_hash == hash2,
2074			"Sent statement should be one of the two created"
2075		);
2076
2077		// Second statement should still be pending
2078		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2079		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2080
2081		// Process second burst - should send the remaining statement
2082		handler.process_initial_sync_burst().await;
2083
2084		let sent = notification_service.get_sent_notifications();
2085		assert_eq!(sent.len(), 2, "Second burst should send another notification");
2086
2087		// Both statements should now be sent
2088		let mut sent_hashes: Vec<_> = sent
2089			.iter()
2090			.flat_map(|(_, notification)| {
2091				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2092			})
2093			.map(|s| s.hash())
2094			.collect();
2095		sent_hashes.sort();
2096		let mut expected_hashes = vec![hash1, hash2];
2097		expected_hashes.sort();
2098		assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2099
2100		// No more pending
2101		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2102	}
2103
2104	#[tokio::test]
2105	async fn test_peer_disconnected_on_flooding() {
2106		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2107			build_handler();
2108
2109		let peer_id = *handler.peers.keys().next().unwrap();
2110
2111		let mut flood_statements = Vec::new();
2112		for i in 0..600_000 {
2113			let mut statement = Statement::new();
2114			statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2115			flood_statements.push(statement);
2116		}
2117
2118		handler.on_statements(peer_id, flood_statements);
2119
2120		let reports = network.get_reports();
2121		assert!(
2122			reports
2123				.iter()
2124				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2125			"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2126			reports
2127		);
2128
2129		let disconnected = network.get_disconnected_peers();
2130		assert!(
2131			disconnected.contains(&peer_id),
2132			"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2133			peer_id,
2134			disconnected
2135		);
2136
2137		// Verify peer state was cleaned up
2138		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2139		assert!(
2140			!handler.pending_initial_syncs.contains_key(&peer_id),
2141			"Peer should be removed from pending_initial_syncs"
2142		);
2143		assert!(
2144			!handler.initial_sync_peer_queue.contains(&peer_id),
2145			"Peer should be removed from initial_sync_peer_queue"
2146		);
2147	}
2148
2149	#[tokio::test]
2150	async fn test_legitimate_traffic_not_flagged() {
2151		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2152			build_handler();
2153
2154		let peer_id = *handler.peers.keys().next().unwrap();
2155
2156		let start = std::time::Instant::now();
2157		let duration = std::time::Duration::from_secs(5);
2158		let mut counter = 0u32;
2159
2160		while start.elapsed() < duration {
2161			let mut statements = Vec::new();
2162			for i in 0..5_000 {
2163				let mut statement = Statement::new();
2164				statement.set_plain_data(vec![
2165					counter as u8,
2166					(counter >> 8) as u8,
2167					(counter >> 16) as u8,
2168					i as u8,
2169				]);
2170				statements.push(statement);
2171				counter = counter.wrapping_add(1);
2172			}
2173
2174			handler.on_statements(peer_id, statements);
2175
2176			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2177		}
2178
2179		let reports = network.get_reports();
2180		assert!(
2181			!reports
2182				.iter()
2183				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2184			"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2185			reports
2186		);
2187
2188		let disconnected = network.get_disconnected_peers();
2189		assert!(
2190			!disconnected.contains(&peer_id),
2191			"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2192			disconnected
2193		);
2194
2195		assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2196	}
2197
2198	#[tokio::test]
2199	async fn test_just_over_rate_limit_triggers_flooding() {
2200		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2201			build_handler();
2202
2203		let peer_id = *handler.peers.keys().next().unwrap();
2204
2205		let mut statements = Vec::new();
2206		for i in 0..260_000 {
2207			let mut statement = Statement::new();
2208			statement.set_plain_data(vec![
2209				i as u8,
2210				(i >> 8) as u8,
2211				(i >> 16) as u8,
2212				(i >> 24) as u8,
2213			]);
2214			statements.push(statement);
2215		}
2216
2217		handler.on_statements(peer_id, statements);
2218
2219		let reports = network.get_reports();
2220		let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2221		assert!(
2222			reports
2223				.iter()
2224				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2225			"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2226			expected_burst,
2227			reports
2228		);
2229
2230		let disconnected = network.get_disconnected_peers();
2231		assert!(
2232			disconnected.contains(&peer_id),
2233			"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2234			disconnected
2235		);
2236
2237		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2238	}
2239
2240	#[tokio::test]
2241	async fn test_burst_of_250k_statements_allowed() {
2242		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2243			build_handler();
2244
2245		let peer_id = *handler.peers.keys().next().unwrap();
2246
2247		let mut statements = Vec::new();
2248		for i in 0..250_000 {
2249			let mut statement = Statement::new();
2250			statement.set_plain_data(vec![
2251				i as u8,
2252				(i >> 8) as u8,
2253				(i >> 16) as u8,
2254				(i >> 24) as u8,
2255			]);
2256			statements.push(statement);
2257		}
2258
2259		handler.on_statements(peer_id, statements);
2260
2261		let reports = network.get_reports();
2262		assert!(
2263			!reports
2264				.iter()
2265				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2266			"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2267			reports
2268		);
2269
2270		assert!(
2271			handler.peers.contains_key(&peer_id),
2272			"Peer should still be connected after 250k burst"
2273		);
2274	}
2275
2276	#[tokio::test]
2277	async fn test_sustained_rate_above_limit_triggers_flooding() {
2278		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2279			build_handler();
2280
2281		let peer_id = *handler.peers.keys().next().unwrap();
2282
2283		let mut counter = 0u32;
2284
2285		let start = std::time::Instant::now();
2286		let duration = std::time::Duration::from_secs(5);
2287
2288		let mut flooding_detected = false;
2289		while start.elapsed() < duration {
2290			let mut statements = Vec::new();
2291			for i in 0..30_000 {
2292				let mut statement = Statement::new();
2293				statement.set_plain_data(vec![
2294					counter as u8,
2295					(counter >> 8) as u8,
2296					(counter >> 16) as u8,
2297					i as u8,
2298				]);
2299				statements.push(statement);
2300				counter = counter.wrapping_add(1);
2301			}
2302
2303			handler.on_statements(peer_id, statements);
2304
2305			// Check if flooding was detected
2306			let reports = network.get_reports();
2307			if reports
2308				.iter()
2309				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2310			{
2311				flooding_detected = true;
2312				break;
2313			}
2314
2315			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2316		}
2317
2318		assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2319
2320		let disconnected = network.get_disconnected_peers();
2321		assert!(
2322			disconnected.contains(&peer_id),
2323			"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2324			disconnected
2325		);
2326
2327		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2328	}
2329}