Skip to main content

sc_network_statement/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Statement handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
25//!   configuration as an extra peers set.
26//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
27//! `Future` that processes statements.
28
29mod affinity;
30
31use crate::config::*;
32
33use affinity::AffinityFilter;
34use codec::{Compact, Decode, Encode, MaxEncodedLen};
35use futures::{
36	channel::oneshot,
37	future::{pending, FusedFuture},
38	prelude::*,
39	stream::FuturesUnordered,
40};
41use governor::{
42	clock::DefaultClock,
43	state::{InMemoryState, NotKeyed},
44	Quota, RateLimiter,
45};
46use prometheus_endpoint::{
47	exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
48	Registry, U64,
49};
50use rand::seq::IteratorRandom;
51use sc_network::{
52	config::{NonReservedPeerMode, SetConfig},
53	error, multiaddr,
54	peer_store::PeerStoreProvider,
55	service::{
56		traits::{NotificationEvent, NotificationService, ValidationResult},
57		NotificationMetrics,
58	},
59	types::ProtocolName,
60	utils::{interval, LruHashSet},
61	NetworkBackend, NetworkEventStream, NetworkPeers,
62};
63use sc_network_sync::{SyncEvent, SyncEventStream};
64use sc_network_types::PeerId;
65use sp_runtime::traits::Block as BlockT;
66use sp_statement_store::{
67	FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
68};
69use std::{
70	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
71	iter,
72	num::{NonZeroU32, NonZeroUsize},
73	pin::Pin,
74	sync::Arc,
75	time::Instant,
76};
77use tokio::time::timeout;
78pub mod config;
79
80/// A set of statements.
81pub type Statements = Vec<Statement>;
82
83/// The protocol version that was negotiated with a peer.
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PeerProtocolVersion {
86	/// V1: messages are encoded as `Vec<Statement>` (the legacy format).
87	V1,
88	/// V2: messages are encoded as `StatementMessage` enum (supports topic affinity).
89	V2,
90}
91
92impl PeerProtocolVersion {
93	/// Returns the encoding envelope overhead for this protocol version.
94	fn envelope_overhead(&self) -> usize {
95		match self {
96			PeerProtocolVersion::V1 => V1_ENVELOPE_OVERHEAD,
97			PeerProtocolVersion::V2 => V2_ENVELOPE_OVERHEAD,
98		}
99	}
100}
101
102#[derive(Debug, Encode, Decode)]
103enum StatementMessage {
104	#[codec(index = 0)]
105	Statements(Vec<Statement>),
106	/// Bloom filter bytes representing the topics this peer is interested in.
107	#[codec(index = 1)]
108	ExplicitTopicAffinity(AffinityFilter),
109}
110
111/// Codec variant index for `StatementMessage::Statements`, kept in sync with `#[codec(index)]`.
112const STATEMENTS_VARIANT_INDEX: u8 = 0;
113
114impl StatementMessage {
115	/// Encode a slice of statement references as a `StatementMessage::Statements`
116	/// without cloning the statements.
117	fn encode_statement_refs(statements: &[&Statement]) -> Vec<u8> {
118		let mut out = Vec::new();
119		STATEMENTS_VARIANT_INDEX.encode_to(&mut out);
120		statements.encode_to(&mut out);
121		out
122	}
123}
124
125/// Future resolving to statement import result.
126pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
127
128mod rep {
129	use sc_network::ReputationChange as Rep;
130	/// Reputation change when a peer sends us any statement.
131	///
132	/// This forces node to verify it, thus the negative value here. Once statement is verified,
133	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
134	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
135	/// Reputation change when a peer sends us any statement that is not invalid.
136	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
137	/// Reputation change when a peer sends us an statement that we didn't know about.
138	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
139	/// Reputation change when a peer sends us an invalid statement.
140	pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
141	/// Reputation change when a peer sends us a duplicate statement.
142	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
143	/// Reputation change when a peer floods us with statements.
144	pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
145	/// Reputation change when a peer sends us a message we can't decode.
146	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad statement message");
147}
148
149const LOG_TARGET: &str = "statement-gossip";
150/// V2 statement protocol suffix, work in progress protocol with topic affinity and other
151/// improvements, may have breaking changes before stabilization.
152const STATEMENT_PROTOCOL_V2: &str = "statement/2";
153/// V1 statement protocol suffix, current stable protocol, no breaking changes will be made to it.
154const STATEMENT_PROTOCOL_V1: &str = "statement/1";
155/// Maximum time we wait for sending a notification to a peer.
156const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
157/// Interval for sending statement batches during initial sync to new peers.
158const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
159/// Interval for processing pending topic affinity changes from peers.
160const PENDING_AFFINITIES_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
161/// Delay before re-adding a peer to the reserved set after a forced disconnect for sync recovery.
162const SYNC_RECOVERY_READD_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
163
164struct Metrics {
165	propagated_statements: Counter<U64>,
166	known_statements_received: Counter<U64>,
167	skipped_oversized_statements: Counter<U64>,
168	propagated_statements_chunks: Histogram,
169	pending_statements: Gauge<U64>,
170	ignored_statements: Counter<U64>,
171	peers_connected: Gauge<U64>,
172	statements_received: Counter<U64>,
173	bytes_sent_total: Counter<U64>,
174	bytes_received_total: Counter<U64>,
175	sent_latency_seconds: Histogram,
176	initial_sync_statements_sent: Counter<U64>,
177	initial_sync_bursts_total: Counter<U64>,
178	initial_sync_peers_active: Gauge<U64>,
179	initial_sync_duration_seconds: Histogram,
180	statement_flooding_detected: Counter<U64>,
181}
182
183impl Metrics {
184	fn register(r: &Registry) -> Result<Self, PrometheusError> {
185		Ok(Self {
186			propagated_statements: register(
187				Counter::new(
188					"substrate_sync_propagated_statements",
189					"Total statements propagated to peers, counted once per recipient (a statement sent to N peers increments by N)",
190				)?,
191				r,
192			)?,
193			known_statements_received: register(
194				Counter::new(
195					"substrate_sync_known_statement_received",
196					"Number of statements received via gossiping that were already in the statement store",
197				)?,
198				r,
199			)?,
200			skipped_oversized_statements: register(
201				Counter::new(
202					"substrate_sync_skipped_oversized_statements",
203					"Number of oversized statements that were skipped to be gossiped",
204				)?,
205				r,
206			)?,
207			propagated_statements_chunks: register(
208				Histogram::with_opts(
209					HistogramOpts::new(
210						"substrate_sync_propagated_statements_chunks",
211						"Distribution of chunk sizes when propagating statements",
212					)
213					.buckets(exponential_buckets(1.0, 2.0, 14)?),
214				)?,
215				r,
216			)?,
217			pending_statements: register(
218				Gauge::new(
219					"substrate_sync_pending_statement_validations",
220					"Number of pending statement validations, sampled once per propagation tick",
221				)?,
222				r,
223			)?,
224			ignored_statements: register(
225				Counter::new(
226					"substrate_sync_ignored_statements",
227					"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
228				)?,
229				r,
230			)?,
231			peers_connected: register(
232				Gauge::new(
233					"substrate_sync_statement_peers_connected",
234					"Number of peers connected using the statement protocol",
235				)?,
236				r,
237			)?,
238			statements_received: register(
239				Counter::new(
240					"substrate_sync_statements_received",
241					"Total number of statements received from peers",
242				)?,
243				r,
244			)?,
245			bytes_sent_total: register(
246				Counter::new(
247					"substrate_sync_statement_bytes_sent_total",
248					"Total bytes sent for statement protocol messages",
249				)?,
250				r,
251			)?,
252			bytes_received_total: register(
253				Counter::new(
254					"substrate_sync_statement_bytes_received_total",
255					"Total bytes received for statement protocol messages (includes bytes from notifications that are later discarded — e.g. while major-syncing)",
256				)?,
257				r,
258			)?,
259			sent_latency_seconds: register(
260				Histogram::with_opts(
261					HistogramOpts::new(
262						"substrate_sync_statement_sent_latency_seconds",
263						"Time to send statement messages to peers",
264					)
265					// Buckets from 1μs to ~1s covering microsecond to millisecond range.
266					.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
267				)?,
268				r,
269			)?,
270			initial_sync_statements_sent: register(
271				Counter::new(
272					"substrate_sync_initial_sync_statements_sent",
273					"Total statements sent during initial sync bursts to newly connected peers",
274				)?,
275				r,
276			)?,
277			initial_sync_bursts_total: register(
278				Counter::new(
279					"substrate_sync_initial_sync_bursts_total",
280					"Total initial-sync burst rounds attempted (includes rounds that return early with no hashes left)",
281				)?,
282				r,
283			)?,
284			initial_sync_peers_active: register(
285				Gauge::new(
286					"substrate_sync_initial_sync_peers_active",
287					"Number of peers currently being synced via initial sync",
288				)?,
289				r,
290			)?,
291			initial_sync_duration_seconds: register(
292				Histogram::with_opts(
293					HistogramOpts::new(
294						"substrate_sync_initial_sync_duration_seconds",
295						"Per-peer duration of initial sync from start until completion or peer disconnect (whichever comes first)",
296					)
297					.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
298				)?,
299				r,
300			)?,
301			statement_flooding_detected: register(
302				Counter::new(
303					"substrate_sync_statement_flooding_detected",
304					"Number of peers disconnected for exceeding statement rate limits",
305				)?,
306				r,
307			)?,
308		})
309	}
310}
311
312/// Prototype for a [`StatementHandler`].
313pub struct StatementHandlerPrototype {
314	protocol_name: ProtocolName,
315	notification_service: Box<dyn NotificationService>,
316}
317
318impl StatementHandlerPrototype {
319	/// Create a new instance.
320	pub fn new<
321		Hash: AsRef<[u8]>,
322		Block: BlockT,
323		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
324	>(
325		genesis_hash: Hash,
326		fork_id: Option<&str>,
327		metrics: NotificationMetrics,
328		peer_store_handle: Arc<dyn PeerStoreProvider>,
329	) -> (Self, Net::NotificationProtocolConfig) {
330		let genesis_hash = genesis_hash.as_ref();
331		let hex = array_bytes::bytes2hex("", genesis_hash);
332		let (protocol_name, fallback_name) = if let Some(fork_id) = fork_id {
333			(
334				format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V2}"),
335				format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V1}"),
336			)
337		} else {
338			(format!("/{hex}/{STATEMENT_PROTOCOL_V2}"), format!("/{hex}/{STATEMENT_PROTOCOL_V1}"))
339		};
340		let (config, notification_service) = Net::notification_config(
341			protocol_name.clone().into(),
342			vec![fallback_name.into()],
343			MAX_STATEMENT_NOTIFICATION_SIZE,
344			None,
345			SetConfig {
346				in_peers: 0,
347				out_peers: 0,
348				reserved_nodes: Vec::new(),
349				non_reserved_mode: NonReservedPeerMode::Deny,
350			},
351			metrics,
352			peer_store_handle,
353		);
354
355		(Self { protocol_name: protocol_name.into(), notification_service }, config)
356	}
357
358	/// Turns the prototype into the actual handler.
359	///
360	/// Important: the statements handler is initially disabled and doesn't gossip statements.
361	/// Gossiping is enabled when major syncing is done.
362	pub fn build<
363		N: NetworkPeers + NetworkEventStream,
364		S: SyncEventStream + sp_consensus::SyncOracle,
365	>(
366		self,
367		network: N,
368		sync: S,
369		statement_store: Arc<dyn StatementStore>,
370		metrics_registry: Option<&Registry>,
371		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
372		mut num_submission_workers: usize,
373		statements_per_second: u32,
374	) -> error::Result<StatementHandler<N, S>> {
375		let sync_event_stream = sync.event_stream("statement-handler-sync");
376		let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
377
378		if num_submission_workers == 0 {
379			log::warn!(
380				target: LOG_TARGET,
381				"num_submission_workers is 0, defaulting to 1"
382			);
383			num_submission_workers = 1;
384		}
385
386		let statements_per_second = match NonZeroU32::new(statements_per_second) {
387			Some(rate) => rate,
388			None => {
389				log::warn!(
390					target: LOG_TARGET,
391					"statements_per_second is 0, defaulting to {}",
392					DEFAULT_STATEMENTS_PER_SECOND
393				);
394				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
395					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
396			},
397		};
398
399		let metrics =
400			if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
401
402		for _ in 0..num_submission_workers {
403			let store = statement_store.clone();
404			let mut queue_receiver = queue_receiver.clone();
405			executor(
406				async move {
407					loop {
408						let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
409							queue_receiver.next().await;
410						match task {
411							None => return,
412							Some((statement, completion)) => {
413								let result = store.submit(statement, StatementSource::Network);
414								if completion.send(result).is_err() {
415									log::debug!(
416										target: LOG_TARGET,
417										"Error sending validation completion"
418									);
419								}
420							},
421						}
422					}
423				}
424				.boxed(),
425			);
426		}
427
428		let handler = StatementHandler {
429			protocol_name: self.protocol_name,
430			notification_service: self.notification_service,
431			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
432				as Pin<Box<dyn Stream<Item = ()> + Send>>)
433				.fuse(),
434			pending_statements: FuturesUnordered::new(),
435			pending_statements_peers: HashMap::new(),
436			network,
437			sync,
438			sync_event_stream: sync_event_stream.fuse(),
439			peers: HashMap::new(),
440			statement_store,
441			queue_sender,
442			statements_per_second,
443			metrics,
444			initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
445			pending_affinities_timeout: Box::pin(
446				tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse(),
447			),
448			pending_initial_syncs: HashMap::new(),
449			initial_sync_peer_queue: VecDeque::new(),
450			deferred_peers: HashSet::new(),
451			dropped_statements_during_sync: false,
452			sync_recovery_peer: None,
453			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
454		};
455
456		Ok(handler)
457	}
458}
459
460/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
461pub struct StatementHandler<
462	N: NetworkPeers + NetworkEventStream,
463	S: SyncEventStream + sp_consensus::SyncOracle,
464> {
465	protocol_name: ProtocolName,
466	/// Interval at which we call `propagate_statements`.
467	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
468	/// Pending statements verification tasks.
469	pending_statements:
470		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
471	/// As multiple peers can send us the same statement, we group
472	/// these peers using the statement hash while the statement is
473	/// imported. This prevents that we import the same statement
474	/// multiple times concurrently.
475	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
476	/// Network service to use to send messages and manage peers.
477	network: N,
478	/// Syncing service.
479	sync: S,
480	/// Receiver for syncing-related events.
481	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
482	/// Notification service.
483	notification_service: Box<dyn NotificationService>,
484	// All connected peers
485	peers: HashMap<PeerId, Peer>,
486	statement_store: Arc<dyn StatementStore>,
487	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
488	/// Maximum statements per second per peer.
489	statements_per_second: NonZeroU32,
490	/// Prometheus metrics.
491	metrics: Option<Metrics>,
492	/// Timeout for sending next statement batch during initial sync.
493	initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
494	/// Timeout for processing pending topic affinity changes.
495	pending_affinities_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
496	/// Pending initial syncs per peer.
497	pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
498	/// Queue for round-robin processing of initial syncs.
499	initial_sync_peer_queue: VecDeque<PeerId>,
500	/// Tracks peers that connected while major sync was active and adds them to the reserved set
501	/// once sync ends
502	deferred_peers: HashSet<PeerId>,
503	/// Set to `true` when an incoming statement is dropped because `is_major_syncing()` is true
504	dropped_statements_during_sync: bool,
505	/// Peer scheduled for forced disconnect+reconnect to recover statements missed during sync
506	sync_recovery_peer: Option<PeerId>,
507	/// Fires when the `sync_recovery_peer` re-add delay has elapsed
508	sync_recovery_readd_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
509}
510
511/// Per-peer rate limiter using a token bucket algorithm.
512///
513/// The token bucket allows short bursts up to the per-second limit while enforcing
514/// the average rate over time.
515#[derive(Debug)]
516struct PeerRateLimiter {
517	limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
518}
519
520impl PeerRateLimiter {
521	fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
522		let quota = Quota::per_second(statements_per_second).allow_burst(burst);
523		Self { limiter: RateLimiter::direct(quota) }
524	}
525
526	/// Check if receiving `count` statements would exceed the rate limit.
527	fn is_flooding(&self, count: usize) -> bool {
528		if count > u32::MAX as usize {
529			return true;
530		}
531
532		let Some(n) = NonZeroU32::new(count as u32) else {
533			return false;
534		};
535		!matches!(self.limiter.check_n(n), Ok(Ok(())))
536	}
537}
538
539/// Peer information
540#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
541#[derive(Debug)]
542pub struct Peer {
543	/// Holds a set of statements known to this peer.
544	known_statements: LruHashSet<Hash>,
545	/// Rate limiter for statement flooding protection.
546	rate_limiter: PeerRateLimiter,
547	/// Protocol version negotiated with this peer.
548	protocol_version: PeerProtocolVersion,
549	/// Topic affinity filter received from a v2 peer.
550	/// When set, only statements matching this filter should be propagated to the peer.
551	topic_affinity: Option<AffinityFilter>,
552	/// Whether this peer is a light client.
553	/// Light clients on V2 must set topic affinity before receiving statements.
554	is_light: bool,
555	/// A pending topic affinity filter waiting to be scheduled for initial sync.
556	/// Set when a new `ExplicitTopicAffinity` arrives; consumed by the main loop
557	/// once any in-progress initial sync for this peer completes.
558	pending_topic_affinity: Option<AffinityFilter>,
559}
560
561/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
562struct PendingInitialSync {
563	hashes: Vec<Hash>,
564	started_at: Instant,
565}
566
567/// Result of finding a sendable chunk of statements.
568enum ChunkResult {
569	/// Found a chunk that fits. Contains the end index (exclusive).
570	Send(usize),
571	/// First statement is oversized, skip it.
572	SkipOversized,
573}
574
575/// Result of sending a chunk of statements.
576enum SendChunkResult {
577	/// Successfully sent a chunk of N statements.
578	Sent(usize),
579	/// First statement was oversized and skipped.
580	Skipped,
581	/// Nothing to send.
582	Empty,
583	/// Send failed.
584	Failed,
585}
586
587/// Encoding overhead for V1: just the `Compact<u32>` vec length prefix (max 5 bytes).
588const V1_ENVELOPE_OVERHEAD: usize = 5;
589
590/// Encoding overhead for V2: 1 byte enum discriminant + `Compact<u32>` vec length prefix.
591const V2_ENVELOPE_OVERHEAD: usize = 1 + V1_ENVELOPE_OVERHEAD;
592
593/// Returns the maximum payload size for statement notifications given the
594/// protocol envelope overhead.
595fn max_statement_payload_size(envelope_overhead: usize) -> usize {
596	debug_assert_eq!(
597		V1_ENVELOPE_OVERHEAD,
598		Compact::<u32>::max_encoded_len(),
599		"V1_ENVELOPE_OVERHEAD must equal Compact::<u32>::max_encoded_len()"
600	);
601	MAX_STATEMENT_NOTIFICATION_SIZE as usize - envelope_overhead
602}
603
604/// Find the largest chunk of statements starting from the beginning that fits
605/// within MAX_STATEMENT_NOTIFICATION_SIZE minus the given `envelope_overhead`.
606///
607/// Uses an incremental approach: adds statements one by one until the limit is reached.
608/// This is efficient because we only compute sizes for statements we'll actually send
609/// in this chunk, rather than computing sizes for all statements upfront.
610fn find_sendable_chunk(statements: &[&Statement], envelope_overhead: usize) -> ChunkResult {
611	if statements.is_empty() {
612		return ChunkResult::Send(0);
613	}
614	let max_size = max_statement_payload_size(envelope_overhead);
615
616	// Incrementally add statements until we exceed the limit.
617	// This is efficient because we only compute sizes for statements in this chunk.
618	// accumulated_size is the sum of encoded sizes of all statements so far (without vec
619	// overhead).
620	let mut accumulated_size = 0;
621	let mut count = 0usize;
622
623	for stmt in &statements[0..] {
624		let stmt_size = stmt.encoded_size();
625		let new_count = count + 1;
626		// Compact encoding overhead for the new count
627		let new_total = accumulated_size + stmt_size;
628		if new_total > max_size {
629			break;
630		}
631
632		accumulated_size += stmt_size;
633		count = new_count;
634	}
635
636	// If we couldn't fit even a single statement, skip it.
637	if count == 0 {
638		ChunkResult::SkipOversized
639	} else {
640		ChunkResult::Send(count)
641	}
642}
643
644impl Peer {
645	/// Create a new peer for testing/benchmarking purposes.
646	#[cfg(any(test, feature = "test-helpers"))]
647	pub fn new_for_testing(
648		known_statements: LruHashSet<Hash>,
649		statements_per_second: NonZeroU32,
650		burst: NonZeroU32,
651	) -> Self {
652		Self {
653			known_statements,
654			rate_limiter: PeerRateLimiter::new(statements_per_second, burst),
655			protocol_version: PeerProtocolVersion::V1,
656			topic_affinity: None,
657			is_light: false,
658			pending_topic_affinity: None,
659		}
660	}
661
662	/// Whether this peer is ready to receive statements.
663	///
664	/// Light V2 peers must set their topic affinity before receiving any statements.
665	fn can_receive(&self) -> bool {
666		!(self.is_light &&
667			self.protocol_version == PeerProtocolVersion::V2 &&
668			self.topic_affinity.is_none())
669	}
670}
671
672impl<N, S> StatementHandler<N, S>
673where
674	N: NetworkPeers + NetworkEventStream,
675	S: SyncEventStream + sp_consensus::SyncOracle,
676{
677	/// Create a new `StatementHandler` for testing/benchmarking purposes.
678	#[cfg(any(test, feature = "test-helpers"))]
679	pub fn new_for_testing(
680		protocol_name: ProtocolName,
681		notification_service: Box<dyn NotificationService>,
682		propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
683		network: N,
684		sync: S,
685		sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
686		peers: HashMap<PeerId, Peer>,
687		statement_store: Arc<dyn StatementStore>,
688		queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
689		statements_per_second: NonZeroU32,
690	) -> Self {
691		Self {
692			protocol_name,
693			notification_service,
694			propagate_timeout,
695			pending_statements: FuturesUnordered::new(),
696			pending_statements_peers: HashMap::new(),
697			network,
698			sync,
699			sync_event_stream,
700			peers,
701			statement_store,
702			queue_sender,
703			statements_per_second,
704			metrics: None,
705			initial_sync_timeout: Box::pin(pending().fuse()),
706			pending_affinities_timeout: Box::pin(pending().fuse()),
707			pending_initial_syncs: HashMap::new(),
708			initial_sync_peer_queue: VecDeque::new(),
709			deferred_peers: HashSet::new(),
710			dropped_statements_during_sync: false,
711			sync_recovery_peer: None,
712			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
713		}
714	}
715
716	/// Get mutable access to pending statements for testing/benchmarking.
717	#[cfg(any(test, feature = "test-helpers"))]
718	pub fn pending_statements_mut(
719		&mut self,
720	) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
721	{
722		&mut self.pending_statements
723	}
724
725	/// Turns the [`StatementHandler`] into a future that should run forever and not be
726	/// interrupted.
727	pub async fn run(mut self) {
728		loop {
729			futures::select_biased! {
730				_ = self.propagate_timeout.next() => {
731					self.propagate_statements().await;
732					self.metrics.as_ref().map(|metrics| {
733						metrics.pending_statements.set(self.pending_statements.len() as u64);
734					});
735				},
736				(hash, result) = self.pending_statements.select_next_some() => {
737					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
738						if let Some(result) = result {
739							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
740						}
741					} else {
742						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
743					}
744				},
745				sync_event = self.sync_event_stream.next() => {
746					if let Some(sync_event) = sync_event {
747						self.handle_sync_event(sync_event);
748					} else {
749						// Syncing has seemingly closed. Closing as well.
750						return;
751					}
752				}
753				event = self.notification_service.next_event().fuse() => {
754					if let Some(event) = event {
755						self.handle_notification_event(event).await
756					} else {
757						// `Notifications` has seemingly closed. Closing as well.
758						return
759					}
760				}
761				_ = &mut self.initial_sync_timeout => {
762					self.process_initial_sync_burst().await;
763					self.initial_sync_timeout =
764						Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
765				},
766				_ = &mut self.pending_affinities_timeout => {
767					self.process_pending_affinities();
768					self.pending_affinities_timeout =
769						Box::pin(tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse());
770				},
771				_ = &mut self.sync_recovery_readd_timeout => {
772					self.try_readd_sync_recovery_peer();
773					self.sync_recovery_readd_timeout = Box::pin(pending().fuse());
774				},
775			}
776
777			if !self.sync.is_major_syncing() {
778				self.drain_deferred_peers();
779				self.start_sync_recovery();
780			}
781		}
782	}
783
784	/// Send a single chunk of statements to a peer.
785	///
786	/// Encodes the chunk according to the peer's protocol version:
787	/// - V1: raw `Vec<Statement>` encoding
788	/// - V2: `StatementMessage::Statements(...)` encoding
789	async fn send_statement_chunk(
790		&mut self,
791		peer: &PeerId,
792		statements: &[&Statement],
793	) -> SendChunkResult {
794		let Some(peer_data) = self.peers.get(peer) else {
795			log::error!(target: LOG_TARGET, "Peer {peer} not found in peers map during send_statement_chunk");
796			return SendChunkResult::Failed;
797		};
798		let peer_version = peer_data.protocol_version;
799		let envelope_overhead = peer_version.envelope_overhead();
800		match find_sendable_chunk(statements, envelope_overhead) {
801			ChunkResult::Send(0) => SendChunkResult::Empty,
802			ChunkResult::Send(chunk_end) => {
803				let chunk = &statements[..chunk_end];
804				let encoded = match peer_version {
805					PeerProtocolVersion::V1 => chunk.encode(),
806					PeerProtocolVersion::V2 => StatementMessage::encode_statement_refs(chunk),
807				};
808				let bytes_to_send = encoded.len() as u64;
809
810				let sent_latency_timer =
811					self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
812				let send_result = timeout(
813					SEND_TIMEOUT,
814					self.notification_service.send_async_notification(peer, encoded),
815				)
816				.await;
817				drop(sent_latency_timer);
818
819				if let Err(e) = send_result {
820					log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
821					return SendChunkResult::Failed;
822				}
823
824				log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
825				self.metrics.as_ref().map(|metrics| {
826					metrics.propagated_statements.inc_by(chunk.len() as u64);
827					metrics.bytes_sent_total.inc_by(bytes_to_send);
828					metrics.propagated_statements_chunks.observe(chunk.len() as f64);
829				});
830				SendChunkResult::Sent(chunk_end)
831			},
832			ChunkResult::SkipOversized => {
833				log::warn!(target: LOG_TARGET, "Statement too large, skipping");
834				self.metrics.as_ref().map(|metrics| {
835					metrics.skipped_oversized_statements.inc();
836				});
837				SendChunkResult::Skipped
838			},
839		}
840	}
841
842	/// Add all peers that were deferred during major sync to the reserved set
843	fn drain_deferred_peers(&mut self) {
844		if self.deferred_peers.is_empty() {
845			return;
846		}
847
848		log::debug!(
849			target: LOG_TARGET,
850			"Major sync complete, adding {} deferred statement peers",
851			self.deferred_peers.len(),
852		);
853
854		let addrs: HashSet<multiaddr::Multiaddr> = self
855			.deferred_peers
856			.drain()
857			.map(|p| {
858				iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
859			})
860			.collect();
861
862		if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
863		{
864			log::warn!(target: LOG_TARGET, "Failed to add deferred peers: {err}");
865		}
866	}
867
868	/// Pick one connected peer, remove it from the reserved set (forcing a disconnect), and
869	/// schedule it for re-adding after `SYNC_RECOVERY_READD_DELAY`. When the peer reconnects it
870	/// performs a fresh initial sync, delivering any statements that were dropped while the
871	/// `is_major_syncing` guard was active
872	fn start_sync_recovery(&mut self) {
873		if !self.dropped_statements_during_sync {
874			return;
875		}
876		self.dropped_statements_during_sync = false;
877
878		if self.sync_recovery_peer.is_some() {
879			return;
880		}
881
882		let Some(&peer_id) = self.peers.keys().choose(&mut rand::thread_rng()) else {
883			return;
884		};
885
886		log::trace!(
887			target: LOG_TARGET,
888			"Major sync complete, force-reconnecting {peer_id} for statement recovery",
889		);
890
891		if let Err(err) = self.network.remove_peers_from_reserved_set(
892			self.protocol_name.clone(),
893			iter::once(peer_id).collect(),
894		) {
895			log::warn!(target: LOG_TARGET, "Failed to remove peer {peer_id} for sync recovery: {err}");
896			return;
897		}
898
899		self.sync_recovery_peer = Some(peer_id);
900		self.sync_recovery_readd_timeout =
901			Box::pin(tokio::time::sleep(SYNC_RECOVERY_READD_DELAY).fuse());
902	}
903
904	/// Re-adds the sync-recovery peer to the reserved set after the backoff window has elapsed
905	fn try_readd_sync_recovery_peer(&mut self) {
906		let Some(peer_id) = self.sync_recovery_peer.take() else { return };
907		log::trace!(
908			target: LOG_TARGET,
909			"Re-adding {peer_id} to reserved set after sync recovery window",
910		);
911		let addr =
912			iter::once(multiaddr::Protocol::P2p(peer_id.into())).collect::<multiaddr::Multiaddr>();
913		if let Err(err) = self
914			.network
915			.add_peers_to_reserved_set(self.protocol_name.clone(), iter::once(addr).collect())
916		{
917			log::warn!(target: LOG_TARGET, "Failed to re-add sync recovery peer {peer_id}: {err}");
918		}
919	}
920
921	fn handle_sync_event(&mut self, event: SyncEvent) {
922		match event {
923			SyncEvent::PeerConnected(remote) => {
924				if self.sync.is_major_syncing() {
925					log::trace!(
926						target: LOG_TARGET,
927						"Major sync in progress, deferring connection to {remote}",
928					);
929					self.deferred_peers.insert(remote);
930					return;
931				}
932				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
933					.collect::<multiaddr::Multiaddr>();
934				let result = self.network.add_peers_to_reserved_set(
935					self.protocol_name.clone(),
936					iter::once(addr).collect(),
937				);
938				if let Err(err) = result {
939					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
940				}
941			},
942			SyncEvent::PeerDisconnected(remote) => {
943				if self.deferred_peers.remove(&remote) {
944					return;
945				}
946				let result = self.network.remove_peers_from_reserved_set(
947					self.protocol_name.clone(),
948					iter::once(remote).collect(),
949				);
950				if let Err(err) = result {
951					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
952				}
953			},
954		}
955	}
956
957	async fn handle_notification_event(&mut self, event: NotificationEvent) {
958		match event {
959			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
960				// Only accept peers whose role can be determined
961				let result = self
962					.network
963					.peer_role(peer, handshake)
964					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
965				let _ = result_tx.send(result);
966			},
967			NotificationEvent::NotificationStreamOpened {
968				peer,
969				negotiated_fallback,
970				handshake,
971				..
972			} => {
973				// If negotiated_fallback is Some, the peer connected on a fallback protocol
974				// (v1). If None, the peer connected on the main protocol (v2).
975				let protocol_version = if negotiated_fallback.is_some() {
976					PeerProtocolVersion::V1
977				} else {
978					PeerProtocolVersion::V2
979				};
980				let Some(peer_role) = self.network.peer_role(peer, handshake) else {
981					log::debug!(
982						target: LOG_TARGET,
983						"Peer {peer} connected but role could not be determined, ignoring"
984					);
985					return;
986				};
987				let is_light = peer_role.is_light();
988				log::debug!(
989					target: LOG_TARGET,
990					"Peer {peer} connected with statement protocol {protocol_version:?}, role={peer_role:?}"
991				);
992				let _was_in = self.peers.insert(
993					peer,
994					Peer {
995						known_statements: LruHashSet::new(
996							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
997						),
998						rate_limiter: PeerRateLimiter::new(
999							self.statements_per_second,
1000							NonZeroU32::new(
1001								self.statements_per_second.get() *
1002									config::STATEMENTS_BURST_COEFFICIENT,
1003							)
1004							.expect("burst capacity is nonzero"),
1005						),
1006						protocol_version,
1007						topic_affinity: None,
1008						is_light,
1009						pending_topic_affinity: None,
1010					},
1011				);
1012				debug_assert!(_was_in.is_none());
1013
1014				self.metrics.as_ref().map(|metrics| {
1015					metrics.peers_connected.set(self.peers.len() as u64);
1016				});
1017
1018				// Light V2 peers must set topic affinity before receiving statements.
1019				// All other peers get initial sync immediately.
1020				if self.peers.get(&peer).map_or(false, |p| p.can_receive()) {
1021					self.schedule_initial_sync_for_peer(peer);
1022				}
1023			},
1024			NotificationEvent::NotificationStreamClosed { peer } => {
1025				let _peer = self.peers.remove(&peer);
1026				debug_assert!(_peer.is_some());
1027				if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
1028					self.metrics.as_ref().map(|metrics| {
1029						metrics.initial_sync_peers_active.dec();
1030						metrics
1031							.initial_sync_duration_seconds
1032							.observe(pending.started_at.elapsed().as_secs_f64());
1033					});
1034				}
1035				self.initial_sync_peer_queue.retain(|p| *p != peer);
1036				self.metrics.as_ref().map(|metrics| {
1037					metrics.peers_connected.set(self.peers.len() as u64);
1038				});
1039			},
1040			NotificationEvent::NotificationReceived { peer, notification } => {
1041				let bytes_received = notification.len() as u64;
1042				self.metrics.as_ref().map(|metrics| {
1043					metrics.bytes_received_total.inc_by(bytes_received);
1044				});
1045
1046				// Accept statements only when node is not major syncing
1047				if self.sync.is_major_syncing() {
1048					log::trace!(
1049						target: LOG_TARGET,
1050						"{peer}: Ignoring statements while major syncing or offline"
1051					);
1052					self.dropped_statements_during_sync = true;
1053					return;
1054				}
1055
1056				let Some(peer_data) = self.peers.get(&peer) else {
1057					log::error!(target: LOG_TARGET, "Received notification from unknown peer {peer}");
1058					return;
1059				};
1060
1061				match peer_data.protocol_version {
1062					PeerProtocolVersion::V1 => {
1063						// V1 peers send raw Vec<Statement>.
1064						if let Ok(statements) =
1065							<Statements as Decode>::decode(&mut notification.as_ref())
1066						{
1067							self.on_statements(peer, statements);
1068						} else {
1069							log::debug!(
1070								target: LOG_TARGET,
1071								"Failed to decode v1 statement list from {peer}"
1072							);
1073							self.network.report_peer(peer, rep::BAD_MESSAGE);
1074						}
1075					},
1076					PeerProtocolVersion::V2 => {
1077						// V2 peers send StatementMessage enum.
1078						if let Ok(message) = StatementMessage::decode(&mut notification.as_ref()) {
1079							match message {
1080								StatementMessage::Statements(statements) => {
1081									self.on_statements(peer, statements)
1082								},
1083								StatementMessage::ExplicitTopicAffinity(filter) => {
1084									if let Some(peer_data) = self.peers.get_mut(&peer) {
1085										if peer_data.rate_limiter.is_flooding(1) {
1086											log::debug!(
1087												target: LOG_TARGET,
1088												"Rate-limiting ExplicitTopicAffinity from {peer}"
1089											);
1090											self.network.report_peer(peer, rep::BAD_MESSAGE);
1091										} else {
1092											log::debug!(
1093												target: LOG_TARGET,
1094												"Received topic affinity filter from {peer}"
1095											);
1096											// Defer both the affinity update and sync scheduling
1097											// to the main loop tick.
1098											peer_data.pending_topic_affinity = Some(filter);
1099										}
1100									}
1101								},
1102							}
1103						} else {
1104							log::debug!(
1105								target: LOG_TARGET,
1106								"Failed to decode v2 statement message from {peer}"
1107							);
1108							self.network.report_peer(peer, rep::BAD_MESSAGE);
1109						}
1110					},
1111				}
1112			},
1113		}
1114	}
1115
1116	/// Called when peer sends us new statements
1117	#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
1118	pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
1119		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
1120
1121		self.metrics.as_ref().map(|metrics| {
1122			metrics.statements_received.inc_by(statements.len() as u64);
1123		});
1124
1125		if let Some(ref mut peer) = self.peers.get_mut(&who) {
1126			if peer.rate_limiter.is_flooding(statements.len()) {
1127				log::warn!(
1128					target: LOG_TARGET,
1129					"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
1130					who,
1131					self.statements_per_second
1132				);
1133
1134				self.network.report_peer(who, rep::STATEMENT_FLOODING);
1135
1136				// Initiate peer state cleanup in the `NotificationStreamClosed` handler
1137				self.network.disconnect_peer(who, self.protocol_name.clone());
1138
1139				if let Some(ref metrics) = self.metrics {
1140					metrics.statement_flooding_detected.inc();
1141				}
1142
1143				return;
1144			}
1145
1146			let mut statements_left = statements.len() as u64;
1147			for s in statements {
1148				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
1149					log::debug!(
1150						target: LOG_TARGET,
1151						"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
1152						statements_left,
1153						MAX_PENDING_STATEMENTS,
1154					);
1155					self.metrics.as_ref().map(|metrics| {
1156						metrics.ignored_statements.inc_by(statements_left);
1157					});
1158					break;
1159				}
1160
1161				let hash = s.hash();
1162				peer.known_statements.insert(hash);
1163
1164				if self.statement_store.has_statement(&hash) {
1165					self.metrics.as_ref().map(|metrics| {
1166						metrics.known_statements_received.inc();
1167					});
1168
1169					if let Some(peers) = self.pending_statements_peers.get(&hash) {
1170						if peers.contains(&who) {
1171							log::trace!(
1172								target: LOG_TARGET,
1173								"Already received the statement from the same peer {who}.",
1174							);
1175							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1176						}
1177					}
1178					continue;
1179				}
1180
1181				self.network.report_peer(who, rep::ANY_STATEMENT);
1182
1183				match self.pending_statements_peers.entry(hash) {
1184					Entry::Vacant(entry) => {
1185						let (completion_sender, completion_receiver) = oneshot::channel();
1186						match self.queue_sender.try_send((s, completion_sender)) {
1187							Ok(()) => {
1188								self.pending_statements.push(
1189									async move {
1190										let res = completion_receiver.await;
1191										(hash, res.ok())
1192									}
1193									.boxed(),
1194								);
1195								entry.insert(HashSet::from_iter([who]));
1196							},
1197							Err(async_channel::TrySendError::Full(_)) => {
1198								log::debug!(
1199									target: LOG_TARGET,
1200									"Dropped statement because validation channel is full",
1201								);
1202							},
1203							Err(async_channel::TrySendError::Closed(_)) => {
1204								log::trace!(
1205									target: LOG_TARGET,
1206									"Dropped statement because validation channel is closed",
1207								);
1208							},
1209						}
1210					},
1211					Entry::Occupied(mut entry) => {
1212						if !entry.get_mut().insert(who) {
1213							// Already received this from the same peer.
1214							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1215						}
1216					},
1217				}
1218
1219				statements_left -= 1;
1220			}
1221		}
1222	}
1223
1224	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
1225		match import {
1226			SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
1227			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
1228			SubmitResult::KnownExpired => {},
1229			SubmitResult::Rejected(_) => {},
1230			SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
1231			SubmitResult::InternalError(_) => {},
1232		}
1233	}
1234
1235	/// Propagate one statement.
1236	pub async fn propagate_statement(&mut self, hash: &Hash) {
1237		// Accept statements only when node is not major syncing
1238		if self.sync.is_major_syncing() {
1239			return;
1240		}
1241
1242		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
1243		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
1244			self.do_propagate_statements(&[(*hash, statement)]).await;
1245		}
1246	}
1247
1248	/// Propagate the given `statements` to the given `peer`.
1249	///
1250	/// Internally filters `statements` to only send unknown statements to the peer.
1251	/// For v2 peers with a topic affinity filter, also filters by topic match.
1252	async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
1253		let Some(peer) = self.peers.get_mut(who) else {
1254			return;
1255		};
1256
1257		if !peer.can_receive() {
1258			return;
1259		}
1260
1261		let to_send: Vec<_> = statements
1262			.iter()
1263			.filter_map(|(hash, stmt)| {
1264				if peer.known_statements.contains(hash) {
1265					return None;
1266				}
1267				// For v2 peers with topic affinity, filter by topic match.
1268				// Don't mark filtered statements as known so they can be retried
1269				// when the peer's affinity changes.
1270				if peer.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
1271					return None;
1272				}
1273				peer.known_statements.insert(*hash);
1274				Some(stmt)
1275			})
1276			.collect();
1277
1278		log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
1279
1280		if to_send.is_empty() {
1281			return;
1282		}
1283
1284		self.send_statements_in_chunks(who, &to_send).await;
1285	}
1286
1287	/// Send statements to a peer in chunks, respecting the maximum notification size.
1288	async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
1289		let mut offset = 0;
1290		while offset < statements.len() {
1291			match self.send_statement_chunk(who, &statements[offset..]).await {
1292				SendChunkResult::Sent(chunk_end) => {
1293					offset += chunk_end;
1294				},
1295				SendChunkResult::Skipped => {
1296					offset += 1;
1297				},
1298				SendChunkResult::Empty | SendChunkResult::Failed => return,
1299			}
1300		}
1301	}
1302
1303	async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
1304		log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
1305		let peers: Vec<_> = self.peers.keys().copied().collect();
1306		for who in peers {
1307			log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
1308			self.send_statements_to_peer(&who, statements).await;
1309		}
1310		log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
1311	}
1312
1313	/// Call when we must propagate ready statements to peers.
1314	async fn propagate_statements(&mut self) {
1315		// Send out statements only when node is not major syncing
1316		if self.sync.is_major_syncing() {
1317			return;
1318		}
1319
1320		let Ok(statements) = self.statement_store.take_recent_statements() else { return };
1321		if !statements.is_empty() {
1322			self.do_propagate_statements(&statements).await;
1323		}
1324	}
1325
1326	/// Schedule an initial sync for a peer, sending all known statements.
1327	///
1328	/// This is called both when a new peer connects and when a peer's topic
1329	/// affinity changes (so that newly-matching statements get sent).
1330	/// If the peer already has a pending initial sync, it is replaced.
1331	fn schedule_initial_sync_for_peer(&mut self, peer: PeerId) {
1332		// If there's already a pending sync, clean it up first.
1333		if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
1334			self.record_initial_sync_completion(pending.started_at);
1335			self.initial_sync_peer_queue.retain(|p| *p != peer);
1336		}
1337		let hashes = self.statement_store.statement_hashes();
1338		// Clear known statements so that all statements are redelivered when
1339		// explicit affinity changes, this is necessary because light nodes change
1340		// their affinity without disconnecting, and we want them to receive all matching
1341		// statements, so they can deliver them to their active subscriptions.
1342		if let Some(peer_data) = self.peers.get_mut(&peer) {
1343			peer_data.known_statements.clear();
1344		}
1345		if !hashes.is_empty() {
1346			self.pending_initial_syncs
1347				.insert(peer, PendingInitialSync { hashes, started_at: Instant::now() });
1348			self.initial_sync_peer_queue.push_back(peer);
1349			self.metrics.as_ref().map(|metrics| {
1350				metrics.initial_sync_peers_active.inc();
1351			});
1352		}
1353	}
1354
1355	/// Process pending topic affinity changes for peers that have no active initial sync.
1356	///
1357	/// When a peer sends `ExplicitTopicAffinity`, we defer the expensive
1358	/// `schedule_initial_sync_for_peer` call. This method applies the pending affinity
1359	/// and schedules the sync once the peer's current sync (if any) has completed.
1360	fn process_pending_affinities(&mut self) {
1361		let ready_peers: Vec<PeerId> = self
1362			.peers
1363			.iter()
1364			.filter(|(peer_id, peer_data)| {
1365				peer_data.pending_topic_affinity.is_some() &&
1366					!self.pending_initial_syncs.contains_key(peer_id)
1367			})
1368			.map(|(peer_id, _)| *peer_id)
1369			.collect();
1370
1371		for peer_id in ready_peers {
1372			if let Some(peer_data) = self.peers.get_mut(&peer_id) {
1373				peer_data.topic_affinity = peer_data.pending_topic_affinity.take();
1374			}
1375			self.schedule_initial_sync_for_peer(peer_id);
1376		}
1377	}
1378
1379	/// Record initial sync completion metrics for a peer being removed.
1380	fn record_initial_sync_completion(&self, started_at: Instant) {
1381		self.metrics.as_ref().map(|metrics| {
1382			metrics.initial_sync_peers_active.dec();
1383			metrics
1384				.initial_sync_duration_seconds
1385				.observe(started_at.elapsed().as_secs_f64());
1386		});
1387	}
1388
1389	/// Process one batch of initial sync for the next peer in the queue (round-robin).
1390	async fn process_initial_sync_burst(&mut self) {
1391		if self.sync.is_major_syncing() {
1392			return;
1393		}
1394
1395		let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1396			return;
1397		};
1398
1399		let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1400			return;
1401		};
1402
1403		self.metrics.as_ref().map(|metrics| {
1404			metrics.initial_sync_bursts_total.inc();
1405		});
1406
1407		if entry.get().hashes.is_empty() {
1408			let started_at = entry.get().started_at;
1409			entry.remove();
1410			self.record_initial_sync_completion(started_at);
1411			return;
1412		}
1413
1414		// Fetch statements up to max_statement_payload_size, skipping statements the peer
1415		// already knows or that don't match its topic affinity directly in the callback.
1416		// This avoids materializing non-matching statements and lets each batch carry more
1417		// useful data.
1418		let Some(peer_data) = self.peers.get(&peer_id) else {
1419			log::error!(target: LOG_TARGET, "Peer {peer_id} has pending initial sync but is not in peers map");
1420			entry.remove();
1421			return;
1422		};
1423		let envelope_overhead = peer_data.protocol_version.envelope_overhead();
1424		let max_size = max_statement_payload_size(envelope_overhead);
1425		let mut accumulated_size = 0;
1426		let (statements, processed) = match self.statement_store.statements_by_hashes(
1427			&entry.get().hashes,
1428			&mut |hash, encoded, stmt| {
1429				// Skip statements the peer already knows or that don't match its topic
1430				// affinity. This avoids materializing non-matching statements and lets
1431				// each batch carry more useful data.
1432				if peer_data.known_statements.contains(hash) {
1433					return FilterDecision::Skip;
1434				}
1435				if peer_data.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
1436					return FilterDecision::Skip;
1437				}
1438				if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1439					return FilterDecision::Abort;
1440				}
1441				accumulated_size += encoded.len();
1442				FilterDecision::Take
1443			},
1444		) {
1445			Ok(r) => r,
1446			Err(e) => {
1447				log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1448				let started_at = entry.get().started_at;
1449				entry.remove();
1450				self.record_initial_sync_completion(started_at);
1451				return;
1452			},
1453		};
1454
1455		// Drain processed hashes and check if more remain
1456		entry.get_mut().hashes.drain(..processed);
1457		let has_more = !entry.get().hashes.is_empty();
1458		drop(entry);
1459
1460		let send_stmts: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1461		match self.send_statement_chunk(&peer_id, &send_stmts).await {
1462			SendChunkResult::Failed => {
1463				if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1464					self.record_initial_sync_completion(pending.started_at);
1465				}
1466				return;
1467			},
1468			SendChunkResult::Sent(sent) => {
1469				debug_assert_eq!(send_stmts.len(), sent);
1470				self.metrics.as_ref().map(|metrics| {
1471					metrics.initial_sync_statements_sent.inc_by(sent as u64);
1472				});
1473				// Mark statements as known
1474				if let Some(peer) = self.peers.get_mut(&peer_id) {
1475					for (hash, _) in &statements {
1476						peer.known_statements.insert(*hash);
1477					}
1478				}
1479			},
1480			SendChunkResult::Empty | SendChunkResult::Skipped => {},
1481		}
1482
1483		// Re-queue if more hashes remain
1484		if has_more {
1485			self.initial_sync_peer_queue.push_back(peer_id);
1486		} else {
1487			if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1488				self.record_initial_sync_completion(pending.started_at);
1489			}
1490		}
1491	}
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496
1497	use super::*;
1498	use std::sync::{
1499		atomic::{AtomicBool, Ordering},
1500		Mutex,
1501	};
1502
1503	/// Default seed used for bloom filters in tests.
1504	const BLOOM_SEED: u128 = 0x5EED_5EED_5EED_5EED;
1505
1506	#[derive(Clone)]
1507	struct TestNetwork {
1508		reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1509		disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1510		/// Role to return from `peer_role`. Default: `Full`.
1511		default_role: sc_network::ObservedRole,
1512		added_reserved: Arc<Mutex<Vec<HashSet<sc_network::Multiaddr>>>>,
1513		removed_reserved: Arc<Mutex<Vec<Vec<PeerId>>>>,
1514	}
1515
1516	impl TestNetwork {
1517		fn new() -> Self {
1518			Self {
1519				reported_peers: Arc::new(Mutex::new(Vec::new())),
1520				disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1521				default_role: sc_network::ObservedRole::Full,
1522				added_reserved: Arc::new(Mutex::new(Vec::new())),
1523				removed_reserved: Arc::new(Mutex::new(Vec::new())),
1524			}
1525		}
1526
1527		fn new_light() -> Self {
1528			Self {
1529				reported_peers: Arc::new(Mutex::new(Vec::new())),
1530				disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1531				default_role: sc_network::ObservedRole::Light,
1532				added_reserved: Arc::new(Mutex::new(Vec::new())),
1533				removed_reserved: Arc::new(Mutex::new(Vec::new())),
1534			}
1535		}
1536
1537		fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1538			self.reported_peers.lock().unwrap().clone()
1539		}
1540
1541		fn get_disconnected_peers(&self) -> Vec<PeerId> {
1542			self.disconnected_peers.lock().unwrap().clone()
1543		}
1544
1545		fn get_added_reserved(&self) -> Vec<HashSet<sc_network::Multiaddr>> {
1546			self.added_reserved.lock().unwrap().clone()
1547		}
1548
1549		fn get_removed_reserved(&self) -> Vec<Vec<PeerId>> {
1550			self.removed_reserved.lock().unwrap().clone()
1551		}
1552	}
1553
1554	#[async_trait::async_trait]
1555	impl NetworkPeers for TestNetwork {
1556		fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1557			unimplemented!()
1558		}
1559
1560		fn set_authorized_only(&self, _: bool) {
1561			unimplemented!()
1562		}
1563
1564		fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1565			unimplemented!()
1566		}
1567
1568		fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1569			self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1570		}
1571
1572		fn peer_reputation(&self, _: &PeerId) -> i32 {
1573			unimplemented!()
1574		}
1575
1576		fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1577			self.disconnected_peers.lock().unwrap().push(peer);
1578		}
1579
1580		fn accept_unreserved_peers(&self) {
1581			unimplemented!()
1582		}
1583
1584		fn deny_unreserved_peers(&self) {
1585			unimplemented!()
1586		}
1587
1588		fn add_reserved_peer(
1589			&self,
1590			_: sc_network::config::MultiaddrWithPeerId,
1591		) -> Result<(), String> {
1592			unimplemented!()
1593		}
1594
1595		fn remove_reserved_peer(&self, _: PeerId) {
1596			unimplemented!()
1597		}
1598
1599		fn set_reserved_peers(
1600			&self,
1601			_: sc_network::ProtocolName,
1602			_: std::collections::HashSet<sc_network::Multiaddr>,
1603		) -> Result<(), String> {
1604			unimplemented!()
1605		}
1606
1607		fn add_peers_to_reserved_set(
1608			&self,
1609			_: sc_network::ProtocolName,
1610			addrs: std::collections::HashSet<sc_network::Multiaddr>,
1611		) -> Result<(), String> {
1612			self.added_reserved.lock().unwrap().push(addrs);
1613			Ok(())
1614		}
1615
1616		fn remove_peers_from_reserved_set(
1617			&self,
1618			_: sc_network::ProtocolName,
1619			peers: Vec<PeerId>,
1620		) -> Result<(), String> {
1621			self.removed_reserved.lock().unwrap().push(peers);
1622			Ok(())
1623		}
1624
1625		fn sync_num_connected(&self) -> usize {
1626			unimplemented!()
1627		}
1628
1629		fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1630			Some(self.default_role)
1631		}
1632
1633		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1634			unimplemented!();
1635		}
1636	}
1637
1638	#[derive(Clone)]
1639	struct TestSync {
1640		major_syncing: Arc<AtomicBool>,
1641	}
1642
1643	impl TestSync {
1644		fn new() -> Self {
1645			Self { major_syncing: Arc::new(AtomicBool::new(false)) }
1646		}
1647
1648		fn with_syncing(initial: bool) -> (Self, Arc<AtomicBool>) {
1649			let flag = Arc::new(AtomicBool::new(initial));
1650			(Self { major_syncing: flag.clone() }, flag)
1651		}
1652	}
1653
1654	impl SyncEventStream for TestSync {
1655		fn event_stream(
1656			&self,
1657			_name: &'static str,
1658		) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1659			Box::pin(futures::stream::pending())
1660		}
1661	}
1662
1663	impl sp_consensus::SyncOracle for TestSync {
1664		fn is_major_syncing(&self) -> bool {
1665			self.major_syncing.load(Ordering::Relaxed)
1666		}
1667
1668		fn is_offline(&self) -> bool {
1669			unimplemented!()
1670		}
1671	}
1672
1673	impl NetworkEventStream for TestNetwork {
1674		fn event_stream(
1675			&self,
1676			_name: &'static str,
1677		) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1678			unimplemented!()
1679		}
1680	}
1681
1682	#[derive(Debug, Clone)]
1683	struct TestNotificationService {
1684		sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1685	}
1686
1687	impl TestNotificationService {
1688		fn new() -> Self {
1689			Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1690		}
1691
1692		fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1693			self.sent_notifications.lock().unwrap().clone()
1694		}
1695
1696		fn clear_sent_notifications(&self) {
1697			self.sent_notifications.lock().unwrap().clear();
1698		}
1699	}
1700
1701	#[async_trait::async_trait]
1702	impl NotificationService for TestNotificationService {
1703		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1704			unimplemented!()
1705		}
1706
1707		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1708			unimplemented!()
1709		}
1710
1711		fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1712			self.sent_notifications.lock().unwrap().push((*peer, notification));
1713		}
1714
1715		async fn send_async_notification(
1716			&mut self,
1717			peer: &PeerId,
1718			notification: Vec<u8>,
1719		) -> Result<(), sc_network::error::Error> {
1720			self.sent_notifications.lock().unwrap().push((*peer, notification));
1721			Ok(())
1722		}
1723
1724		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1725			unimplemented!()
1726		}
1727
1728		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1729			unimplemented!()
1730		}
1731
1732		async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1733			None
1734		}
1735
1736		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1737			unimplemented!()
1738		}
1739
1740		fn protocol(&self) -> &sc_network::types::ProtocolName {
1741			unimplemented!()
1742		}
1743
1744		fn message_sink(
1745			&self,
1746			_peer: &PeerId,
1747		) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1748			unimplemented!()
1749		}
1750	}
1751
1752	#[derive(Clone)]
1753	struct TestStatementStore {
1754		statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1755		recent_statements:
1756			Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1757	}
1758
1759	impl TestStatementStore {
1760		fn new() -> Self {
1761			Self { statements: Default::default(), recent_statements: Default::default() }
1762		}
1763	}
1764
1765	impl StatementStore for TestStatementStore {
1766		fn statements(
1767			&self,
1768		) -> sp_statement_store::Result<
1769			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1770		> {
1771			Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1772		}
1773
1774		fn take_recent_statements(
1775			&self,
1776		) -> sp_statement_store::Result<
1777			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1778		> {
1779			Ok(self.recent_statements.lock().unwrap().drain().collect())
1780		}
1781
1782		fn statement(
1783			&self,
1784			_hash: &sp_statement_store::Hash,
1785		) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1786			unimplemented!()
1787		}
1788
1789		fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1790			self.statements.lock().unwrap().contains_key(hash)
1791		}
1792
1793		fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1794			self.statements.lock().unwrap().keys().cloned().collect()
1795		}
1796
1797		fn statements_by_hashes(
1798			&self,
1799			hashes: &[sp_statement_store::Hash],
1800			filter: &mut dyn FnMut(
1801				&sp_statement_store::Hash,
1802				&[u8],
1803				&sp_statement_store::Statement,
1804			) -> FilterDecision,
1805		) -> sp_statement_store::Result<(
1806			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1807			usize,
1808		)> {
1809			let statements = self.statements.lock().unwrap();
1810			let mut result = Vec::new();
1811			let mut processed = 0;
1812			for hash in hashes {
1813				let Some(stmt) = statements.get(hash) else {
1814					processed += 1;
1815					continue;
1816				};
1817				let encoded = stmt.encode();
1818				match filter(hash, &encoded, stmt) {
1819					FilterDecision::Skip => {
1820						processed += 1;
1821					},
1822					FilterDecision::Take => {
1823						processed += 1;
1824						result.push((*hash, stmt.clone()));
1825					},
1826					FilterDecision::Abort => break,
1827				}
1828			}
1829			Ok((result, processed))
1830		}
1831
1832		fn broadcasts(
1833			&self,
1834			_match_all_topics: &[sp_statement_store::Topic],
1835		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1836			unimplemented!()
1837		}
1838
1839		fn posted(
1840			&self,
1841			_match_all_topics: &[sp_statement_store::Topic],
1842			_dest: [u8; 32],
1843		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1844			unimplemented!()
1845		}
1846
1847		fn posted_clear(
1848			&self,
1849			_match_all_topics: &[sp_statement_store::Topic],
1850			_dest: [u8; 32],
1851		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1852			unimplemented!()
1853		}
1854
1855		fn broadcasts_stmt(
1856			&self,
1857			_match_all_topics: &[sp_statement_store::Topic],
1858		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1859			unimplemented!()
1860		}
1861
1862		fn posted_stmt(
1863			&self,
1864			_match_all_topics: &[sp_statement_store::Topic],
1865			_dest: [u8; 32],
1866		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1867			unimplemented!()
1868		}
1869
1870		fn posted_clear_stmt(
1871			&self,
1872			_match_all_topics: &[sp_statement_store::Topic],
1873			_dest: [u8; 32],
1874		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1875			unimplemented!()
1876		}
1877
1878		fn submit(
1879			&self,
1880			_statement: sp_statement_store::Statement,
1881			_source: sp_statement_store::StatementSource,
1882		) -> sp_statement_store::SubmitResult {
1883			unimplemented!()
1884		}
1885
1886		fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1887			unimplemented!()
1888		}
1889
1890		fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1891			unimplemented!()
1892		}
1893	}
1894
1895	fn build_handler(
1896		num_peers: usize,
1897	) -> (
1898		StatementHandler<TestNetwork, TestSync>,
1899		TestStatementStore,
1900		TestNetwork,
1901		TestNotificationService,
1902		async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1903		Vec<PeerId>,
1904	) {
1905		let statement_store = TestStatementStore::new();
1906		let (queue_sender, queue_receiver) = async_channel::bounded(100);
1907		let network = TestNetwork::new();
1908		let notification_service = TestNotificationService::new();
1909		let mut peers = HashMap::new();
1910		let mut peer_ids = Vec::with_capacity(num_peers);
1911
1912		for _ in 0..num_peers {
1913			let peer_id = PeerId::random();
1914			peer_ids.push(peer_id);
1915			peers.insert(
1916				peer_id,
1917				Peer {
1918					known_statements: LruHashSet::new(NonZeroUsize::new(1000).unwrap()),
1919					rate_limiter: PeerRateLimiter::new(
1920						NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1921							.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1922						NonZeroU32::new(
1923							DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1924						)
1925						.expect("burst capacity is nonzero"),
1926					),
1927					protocol_version: PeerProtocolVersion::V1,
1928					topic_affinity: None,
1929					is_light: false,
1930					pending_topic_affinity: None,
1931				},
1932			);
1933		}
1934
1935		let handler = StatementHandler {
1936			protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
1937			notification_service: Box::new(notification_service.clone()),
1938			propagate_timeout: (Box::pin(futures::stream::pending())
1939				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1940				.fuse(),
1941			pending_statements: FuturesUnordered::new(),
1942			pending_statements_peers: HashMap::new(),
1943			network: network.clone(),
1944			sync: TestSync::new(),
1945			sync_event_stream: (Box::pin(futures::stream::pending())
1946				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1947				.fuse(),
1948			peers,
1949			statement_store: Arc::new(statement_store.clone()),
1950			queue_sender,
1951			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1952				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1953			metrics: None,
1954			initial_sync_timeout: Box::pin(futures::future::pending()),
1955			pending_affinities_timeout: Box::pin(futures::future::pending()),
1956			pending_initial_syncs: HashMap::new(),
1957			initial_sync_peer_queue: VecDeque::new(),
1958			deferred_peers: HashSet::new(),
1959			dropped_statements_during_sync: false,
1960			sync_recovery_peer: None,
1961			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1962		};
1963		(handler, statement_store, network, notification_service, queue_receiver, peer_ids)
1964	}
1965
1966	fn get_peer_hashes(sent: &[(PeerId, Vec<u8>)], peer: PeerId) -> Vec<sp_statement_store::Hash> {
1967		sent.iter()
1968			.filter(|(p, _)| *p == peer)
1969			.flat_map(|(_, notification)| {
1970				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1971			})
1972			.map(|s| s.hash())
1973			.collect()
1974	}
1975
1976	/// Simulate the network closing the substream for every disconnected
1977	/// peer, so the handler runs its per-peer cleanup.
1978	async fn dispatch_disconnects(
1979		handler: &mut StatementHandler<TestNetwork, TestSync>,
1980		network: &TestNetwork,
1981	) {
1982		for peer in network.get_disconnected_peers() {
1983			handler
1984				.handle_notification_event(NotificationEvent::NotificationStreamClosed { peer })
1985				.await;
1986		}
1987	}
1988
1989	#[tokio::test]
1990	async fn test_skips_processing_statements_that_already_in_store() {
1991		let (mut handler, statement_store, _network, _notification_service, queue_receiver, _) =
1992			build_handler(1);
1993
1994		let mut statement1 = Statement::new();
1995		statement1.set_plain_data(b"statement1".to_vec());
1996		let hash1 = statement1.hash();
1997
1998		statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1999
2000		let mut statement2 = Statement::new();
2001		statement2.set_plain_data(b"statement2".to_vec());
2002		let hash2 = statement2.hash();
2003
2004		let peer_id = *handler.peers.keys().next().unwrap();
2005
2006		handler.on_statements(peer_id, vec![statement1, statement2]);
2007
2008		let to_submit = queue_receiver.try_recv();
2009		assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
2010
2011		let no_more = queue_receiver.try_recv();
2012		assert!(no_more.is_err(), "Expected only one statement to be queued");
2013	}
2014
2015	#[tokio::test]
2016	async fn test_reports_for_duplicate_statements() {
2017		let (mut handler, statement_store, network, _notification_service, queue_receiver, _) =
2018			build_handler(1);
2019
2020		let peer_id = *handler.peers.keys().next().unwrap();
2021
2022		let mut statement1 = Statement::new();
2023		statement1.set_plain_data(b"statement1".to_vec());
2024
2025		handler.on_statements(peer_id, vec![statement1.clone()]);
2026		{
2027			// Manually process statements submission
2028			let (s, _) = queue_receiver.try_recv().unwrap();
2029			let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
2030			handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
2031		}
2032
2033		handler.on_statements(peer_id, vec![statement1]);
2034
2035		let reports = network.get_reports();
2036		assert_eq!(
2037			reports,
2038			vec![
2039				(peer_id, rep::ANY_STATEMENT),        // Report for first statement
2040				(peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement
2041				(peer_id, rep::DUPLICATE_STATEMENT)   // Report for duplicate statement
2042			],
2043			"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
2044			reports
2045		);
2046	}
2047
2048	#[tokio::test]
2049	async fn test_splits_large_batches_into_smaller_chunks() {
2050		let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2051			build_handler(1);
2052
2053		let num_statements = 30;
2054		let statement_size = 100 * 1024; // 100KB per statement
2055		for i in 0..num_statements {
2056			let mut statement = Statement::new();
2057			let mut data = vec![0u8; statement_size];
2058			data[0] = i as u8;
2059			statement.set_plain_data(data);
2060			let hash = statement.hash();
2061			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2062		}
2063
2064		handler.propagate_statements().await;
2065
2066		let sent = notification_service.get_sent_notifications();
2067		let mut total_statements_sent = 0;
2068		assert!(
2069			sent.len() == 3,
2070			"Expected batch to be split into 3 chunks, but got {} chunks",
2071			sent.len()
2072		);
2073		for (_peer, notification) in sent.iter() {
2074			assert!(
2075				notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2076				"Notification size {} exceeds limit {}",
2077				notification.len(),
2078				MAX_STATEMENT_NOTIFICATION_SIZE
2079			);
2080			if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
2081				total_statements_sent += stmts.len();
2082			}
2083		}
2084
2085		assert_eq!(
2086			total_statements_sent, num_statements,
2087			"Expected all {} statements to be sent, but only {} were sent",
2088			num_statements, total_statements_sent
2089		);
2090	}
2091
2092	#[tokio::test]
2093	async fn test_skips_only_oversized_statements() {
2094		let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2095			build_handler(1);
2096
2097		let mut statement1 = Statement::new();
2098		statement1.set_plain_data(vec![1u8; 100]);
2099		let hash1 = statement1.hash();
2100		statement_store
2101			.recent_statements
2102			.lock()
2103			.unwrap()
2104			.insert(hash1, statement1.clone());
2105
2106		let mut oversized1 = Statement::new();
2107		oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
2108		let hash_oversized1 = oversized1.hash();
2109		statement_store
2110			.recent_statements
2111			.lock()
2112			.unwrap()
2113			.insert(hash_oversized1, oversized1);
2114
2115		let mut statement2 = Statement::new();
2116		statement2.set_plain_data(vec![3u8; 100]);
2117		let hash2 = statement2.hash();
2118		statement_store
2119			.recent_statements
2120			.lock()
2121			.unwrap()
2122			.insert(hash2, statement2.clone());
2123
2124		let mut oversized2 = Statement::new();
2125		oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
2126		let hash_oversized2 = oversized2.hash();
2127		statement_store
2128			.recent_statements
2129			.lock()
2130			.unwrap()
2131			.insert(hash_oversized2, oversized2);
2132
2133		let mut statement3 = Statement::new();
2134		statement3.set_plain_data(vec![5u8; 100]);
2135		let hash3 = statement3.hash();
2136		statement_store
2137			.recent_statements
2138			.lock()
2139			.unwrap()
2140			.insert(hash3, statement3.clone());
2141
2142		handler.propagate_statements().await;
2143
2144		let sent = notification_service.get_sent_notifications();
2145
2146		let mut sent_hashes = sent
2147			.iter()
2148			.flat_map(|(_peer, notification)| {
2149				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2150			})
2151			.map(|s| s.hash())
2152			.collect::<Vec<_>>();
2153		sent_hashes.sort();
2154		let mut expected_hashes = vec![hash1, hash2, hash3];
2155		expected_hashes.sort();
2156		assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
2157	}
2158
2159	fn build_handler_no_peers() -> (
2160		StatementHandler<TestNetwork, TestSync>,
2161		TestStatementStore,
2162		TestNetwork,
2163		TestNotificationService,
2164	) {
2165		let statement_store = TestStatementStore::new();
2166		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
2167		let network = TestNetwork::new();
2168		let notification_service = TestNotificationService::new();
2169
2170		let handler = StatementHandler {
2171			protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
2172			notification_service: Box::new(notification_service.clone()),
2173			propagate_timeout: (Box::pin(futures::stream::pending())
2174				as Pin<Box<dyn Stream<Item = ()> + Send>>)
2175				.fuse(),
2176			pending_statements: FuturesUnordered::new(),
2177			pending_statements_peers: HashMap::new(),
2178			network: network.clone(),
2179			sync: TestSync::new(),
2180			sync_event_stream: (Box::pin(futures::stream::pending())
2181				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2182				.fuse(),
2183			peers: HashMap::new(),
2184			statement_store: Arc::new(statement_store.clone()),
2185			queue_sender,
2186			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2187				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2188			metrics: None,
2189			initial_sync_timeout: Box::pin(futures::future::pending()),
2190			pending_affinities_timeout: Box::pin(futures::future::pending()),
2191			pending_initial_syncs: HashMap::new(),
2192			initial_sync_peer_queue: VecDeque::new(),
2193			deferred_peers: HashSet::new(),
2194			dropped_statements_during_sync: false,
2195			sync_recovery_peer: None,
2196			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2197		};
2198		(handler, statement_store, network, notification_service)
2199	}
2200
2201	/// Like `build_handler_no_peers` but the network mock returns `Light` for peer roles.
2202	fn build_handler_no_peers_light() -> (
2203		StatementHandler<TestNetwork, TestSync>,
2204		TestStatementStore,
2205		TestNetwork,
2206		TestNotificationService,
2207	) {
2208		let statement_store = TestStatementStore::new();
2209		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
2210		let network = TestNetwork::new_light();
2211		let notification_service = TestNotificationService::new();
2212
2213		let handler = StatementHandler {
2214			protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
2215			notification_service: Box::new(notification_service.clone()),
2216			propagate_timeout: (Box::pin(futures::stream::pending())
2217				as Pin<Box<dyn Stream<Item = ()> + Send>>)
2218				.fuse(),
2219			pending_statements: FuturesUnordered::new(),
2220			pending_statements_peers: HashMap::new(),
2221			network: network.clone(),
2222			sync: TestSync::new(),
2223			sync_event_stream: (Box::pin(futures::stream::pending())
2224				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2225				.fuse(),
2226			peers: HashMap::new(),
2227			statement_store: Arc::new(statement_store.clone()),
2228			queue_sender,
2229			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2230				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2231			metrics: None,
2232			initial_sync_timeout: Box::pin(futures::future::pending()),
2233			pending_affinities_timeout: Box::pin(futures::future::pending()),
2234			pending_initial_syncs: HashMap::new(),
2235			initial_sync_peer_queue: VecDeque::new(),
2236			deferred_peers: HashSet::new(),
2237			dropped_statements_during_sync: false,
2238			sync_recovery_peer: None,
2239			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2240		};
2241		(handler, statement_store, network, notification_service)
2242	}
2243
2244	#[tokio::test]
2245	async fn test_initial_sync_burst_single_peer() {
2246		let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2247
2248		// Create 20MB of statements (200 statements x 100KB each)
2249		// Using 100KB ensures ~10 statements per 1MB batch, requiring ~20 bursts
2250		let num_statements = 200;
2251		let statement_size = 100 * 1024; // 100KB per statement
2252		let mut expected_hashes = Vec::new();
2253		for i in 0..num_statements {
2254			let mut statement = Statement::new();
2255			let mut data = vec![0u8; statement_size];
2256			// Use multiple bytes for uniqueness since we have >255 statements
2257			data[0] = (i % 256) as u8;
2258			data[1] = (i / 256) as u8;
2259			statement.set_plain_data(data);
2260			let hash = statement.hash();
2261			expected_hashes.push(hash);
2262			statement_store.statements.lock().unwrap().insert(hash, statement);
2263		}
2264
2265		// Setup peer and simulate connection
2266		let peer_id = PeerId::random();
2267
2268		handler
2269			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2270				peer: peer_id,
2271				direction: sc_network::service::traits::Direction::Inbound,
2272				handshake: vec![],
2273				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2274			})
2275			.await;
2276
2277		// Verify peer was added and initial sync was queued
2278		assert!(handler.peers.contains_key(&peer_id));
2279		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2280		assert_eq!(handler.initial_sync_peer_queue.len(), 1);
2281
2282		// Process bursts until all statements are sent
2283		let mut burst_count = 0;
2284		while handler.pending_initial_syncs.contains_key(&peer_id) {
2285			handler.process_initial_sync_burst().await;
2286			burst_count += 1;
2287			// Safety limit
2288			assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
2289		}
2290
2291		// Verify multiple bursts were needed
2292		// With 200 statements x 100KB each and ~1MB per batch, we expect many bursts
2293		assert!(
2294			burst_count >= 10,
2295			"Expected multiple bursts for 200 statements of 100KB each, got {}",
2296			burst_count
2297		);
2298
2299		// Verify all statements were sent
2300		let sent = notification_service.get_sent_notifications();
2301		let mut sent_hashes: Vec<_> = sent
2302			.iter()
2303			.flat_map(|(peer, notification)| {
2304				assert_eq!(*peer, peer_id);
2305				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2306			})
2307			.map(|s| s.hash())
2308			.collect();
2309		sent_hashes.sort();
2310		expected_hashes.sort();
2311
2312		assert_eq!(
2313			sent_hashes.len(),
2314			expected_hashes.len(),
2315			"Expected {} statements to be sent, got {}",
2316			expected_hashes.len(),
2317			sent_hashes.len()
2318		);
2319		assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
2320
2321		// Verify cleanup
2322		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2323		assert!(handler.initial_sync_peer_queue.is_empty());
2324	}
2325
2326	#[tokio::test]
2327	async fn test_initial_sync_burst_multiple_peers_round_robin() {
2328		let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2329
2330		// Create 20MB of statements (200 statements x 100KB each)
2331		let num_statements = 200;
2332		let statement_size = 100 * 1024; // 100KB per statement
2333		let mut expected_hashes = Vec::new();
2334		for i in 0..num_statements {
2335			let mut statement = Statement::new();
2336			let mut data = vec![0u8; statement_size];
2337			data[0] = (i % 256) as u8;
2338			data[1] = (i / 256) as u8;
2339			statement.set_plain_data(data);
2340			let hash = statement.hash();
2341			expected_hashes.push(hash);
2342			statement_store.statements.lock().unwrap().insert(hash, statement);
2343		}
2344
2345		// Setup 3 peers and simulate connections
2346		let peer1 = PeerId::random();
2347		let peer2 = PeerId::random();
2348		let peer3 = PeerId::random();
2349
2350		// Connect peers
2351		for peer in [peer1, peer2, peer3] {
2352			handler
2353				.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2354					peer,
2355					direction: sc_network::service::traits::Direction::Inbound,
2356					handshake: vec![],
2357					negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2358				})
2359				.await;
2360		}
2361
2362		// Verify all peers were added and initial syncs were queued
2363		assert_eq!(handler.peers.len(), 3);
2364		assert_eq!(handler.pending_initial_syncs.len(), 3);
2365		assert_eq!(handler.initial_sync_peer_queue.len(), 3);
2366
2367		// Track which peer was processed on each burst for round-robin verification
2368		let mut peer_burst_order = Vec::new();
2369		let mut burst_count = 0;
2370
2371		while !handler.pending_initial_syncs.is_empty() {
2372			// Record which peer will be processed next
2373			if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
2374				peer_burst_order.push(next_peer);
2375			}
2376			handler.process_initial_sync_burst().await;
2377			burst_count += 1;
2378			// Safety limit
2379			assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
2380		}
2381
2382		// Verify multiple bursts were needed
2383		// With 3 peers and many bursts per peer, we expect many bursts total
2384		assert!(
2385			burst_count >= 30,
2386			"Expected many bursts for 3 peers with 200 statements each, got {}",
2387			burst_count
2388		);
2389
2390		// Verify round-robin pattern in first 9 bursts (3 peers x 3 rounds)
2391		assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
2392		// First round
2393		assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
2394		assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
2395		assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
2396		// Second round
2397		assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
2398		assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
2399		assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
2400
2401		// Verify all peers received all statements
2402		let sent = notification_service.get_sent_notifications();
2403		let mut peer1_hashes = get_peer_hashes(&sent, peer1);
2404		let mut peer2_hashes = get_peer_hashes(&sent, peer2);
2405		let mut peer3_hashes = get_peer_hashes(&sent, peer3);
2406
2407		peer1_hashes.sort();
2408		peer2_hashes.sort();
2409		peer3_hashes.sort();
2410		expected_hashes.sort();
2411
2412		assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
2413		assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
2414		assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
2415
2416		// Verify cleanup
2417		assert!(handler.pending_initial_syncs.is_empty());
2418		assert!(handler.initial_sync_peer_queue.is_empty());
2419	}
2420
2421	#[tokio::test]
2422	async fn test_send_statements_in_chunks_exact_max_size() {
2423		let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2424			build_handler(1);
2425
2426		// Calculate the data sizes so that 100 statements together exactly fill max_size.
2427		// This tests that all 100 statements fit in a single notification.
2428		//
2429		// The limit check in find_sendable_chunk is:
2430		//   max_size = MAX_STATEMENT_NOTIFICATION_SIZE - Compact::<u32>::max_encoded_len()
2431		//
2432		// Statement encoding (encodes as Vec<Field>):
2433		// - Compact<u32> for number of fields (1 byte for value 2: expiry + data)
2434		// - Field::Expiry discriminant (1 byte, value 2)
2435		// - u64 expiry value (8 bytes)
2436		// - Field::Data discriminant (1 byte, value 8)
2437		// - Compact<u32> for the data length (2 bytes for small data)
2438		// So per-statement overhead = 1 + 1 + 8 + 1 + 2 = 13 bytes
2439		let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
2440		let num_statements: usize = 100;
2441		let per_statement_overhead = 1 + 1 + 8 + 1 + 2; // Vec<Field> length + expiry field + data discriminant + Compact data length
2442		let total_overhead = per_statement_overhead * num_statements;
2443		let total_data_size = max_size - total_overhead;
2444		let per_statement_data_size = total_data_size / num_statements;
2445		let remainder = total_data_size % num_statements;
2446
2447		let mut expected_hashes = Vec::with_capacity(num_statements);
2448		let mut total_encoded_size = 0;
2449
2450		for i in 0..num_statements {
2451			let mut statement = Statement::new();
2452			// Distribute remainder across first `remainder` statements to exactly fill max_size
2453			let extra = if i < remainder { 1 } else { 0 };
2454			let mut data = vec![42u8; per_statement_data_size + extra];
2455			// Make each statement unique by modifying the first few bytes
2456			data[0] = i as u8;
2457			data[1] = (i >> 8) as u8;
2458			statement.set_plain_data(data);
2459
2460			total_encoded_size += statement.encoded_size();
2461
2462			let hash = statement.hash();
2463			expected_hashes.push(hash);
2464			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2465		}
2466
2467		// Verify our calculation: total encoded size should be <= max_size
2468		assert!(
2469			total_encoded_size == max_size,
2470			"Total encoded size {} should be <= max_size {}",
2471			total_encoded_size,
2472			max_size
2473		);
2474
2475		handler.propagate_statements().await;
2476
2477		let sent = notification_service.get_sent_notifications();
2478
2479		// All statements should fit in a single chunk
2480		assert_eq!(
2481			sent.len(),
2482			1,
2483			"Expected 1 notification for all {} statements, but got {}",
2484			num_statements,
2485			sent.len()
2486		);
2487
2488		let (_peer, notification) = &sent[0];
2489		assert!(
2490			notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2491			"Notification size {} exceeds limit {}",
2492			notification.len(),
2493			MAX_STATEMENT_NOTIFICATION_SIZE
2494		);
2495
2496		let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
2497		assert_eq!(
2498			decoded.len(),
2499			num_statements,
2500			"Expected {} statements in the notification",
2501			num_statements
2502		);
2503
2504		// Verify all statements were sent (order may differ due to HashMap iteration)
2505		let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
2506		expected_hashes.sort();
2507		received_hashes.sort();
2508		assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
2509	}
2510
2511	#[tokio::test]
2512	async fn test_initial_sync_burst_size_limit_consistency() {
2513		// This test verifies that process_initial_sync_burst and find_sendable_chunk
2514		// use the same size limit (max_statement_payload_size).
2515		//
2516		// Previously there was a bug where the filter in process_initial_sync_burst used
2517		// MAX_STATEMENT_NOTIFICATION_SIZE, but find_sendable_chunk reserved extra space
2518		// for Compact::<u32>::max_encoded_len(). This caused a debug_assert failure when
2519		// statements fit the filter but not find_sendable_chunk.
2520		//
2521		// With the fix, both use max_statement_payload_size(), so the filter will reject
2522		// statements that wouldn't fit in find_sendable_chunk.
2523		let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2524
2525		// This peer connects as V1 (see negotiated_fallback below).
2526		let payload_limit = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
2527
2528		// Create first statement that's just over half the payload limit
2529		let first_stmt_data_size = payload_limit / 2 + 10;
2530		let mut stmt1 = Statement::new();
2531		stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2532		let stmt1_encoded_size = stmt1.encoded_size();
2533
2534		// Create second statement that, combined with the first, exceeds the payload limit.
2535		// This means the filter will only accept the first statement.
2536		let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2537		let target_stmt2_encoded = remaining + 3; // 3 bytes over limit when combined
2538		let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); // ~4 bytes encoding overhead
2539		let mut stmt2 = Statement::new();
2540		stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2541		let stmt2_encoded_size = stmt2.encoded_size();
2542
2543		let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2544
2545		// Verify our setup: total exceeds payload limit
2546		assert!(
2547			total_encoded > payload_limit,
2548			"Total {} should exceed payload_limit {} so filter rejects second statement",
2549			total_encoded,
2550			payload_limit
2551		);
2552
2553		let hash1 = stmt1.hash();
2554		let hash2 = stmt2.hash();
2555		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2556		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2557
2558		// Setup peer and simulate connection
2559		let peer_id = PeerId::random();
2560
2561		handler
2562			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2563				peer: peer_id,
2564				direction: sc_network::service::traits::Direction::Inbound,
2565				handshake: vec![],
2566				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2567			})
2568			.await;
2569
2570		// Verify initial sync was queued with both hashes
2571		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2572		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2573
2574		// Process first burst - should send only one statement (the other doesn't fit)
2575		handler.process_initial_sync_burst().await;
2576
2577		// With the fix, the filter and find_sendable_chunk use the same limit,
2578		// so no assertion failure occurs. Only one statement is fetched and sent.
2579		let sent = notification_service.get_sent_notifications();
2580		assert_eq!(sent.len(), 1, "First burst should send one notification");
2581
2582		let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2583		assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2584
2585		// Verify one of the two statements was sent (order is non-deterministic due to HashMap)
2586		let sent_hash = decoded[0].hash();
2587		assert!(
2588			sent_hash == hash1 || sent_hash == hash2,
2589			"Sent statement should be one of the two created"
2590		);
2591
2592		// Second statement should still be pending
2593		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2594		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2595
2596		// Process second burst - should send the remaining statement
2597		handler.process_initial_sync_burst().await;
2598
2599		let sent = notification_service.get_sent_notifications();
2600		assert_eq!(sent.len(), 2, "Second burst should send another notification");
2601
2602		// Both statements should now be sent
2603		let mut sent_hashes: Vec<_> = sent
2604			.iter()
2605			.flat_map(|(_, notification)| {
2606				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2607			})
2608			.map(|s| s.hash())
2609			.collect();
2610		sent_hashes.sort();
2611		let mut expected_hashes = vec![hash1, hash2];
2612		expected_hashes.sort();
2613		assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2614
2615		// No more pending
2616		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2617	}
2618
2619	#[tokio::test]
2620	async fn test_peer_disconnected_on_flooding() {
2621		let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2622			build_handler(1);
2623
2624		let peer_id = *handler.peers.keys().next().unwrap();
2625
2626		let mut flood_statements = Vec::new();
2627		for i in 0..600_000 {
2628			let mut statement = Statement::new();
2629			statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2630			flood_statements.push(statement);
2631		}
2632
2633		handler.on_statements(peer_id, flood_statements);
2634
2635		let reports = network.get_reports();
2636		assert!(
2637			reports
2638				.iter()
2639				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2640			"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2641			reports
2642		);
2643
2644		let disconnected = network.get_disconnected_peers();
2645		assert!(
2646			disconnected.contains(&peer_id),
2647			"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2648			peer_id,
2649			disconnected
2650		);
2651
2652		dispatch_disconnects(&mut handler, &network).await;
2653
2654		// Verify peer state was cleaned up
2655		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2656		assert!(
2657			!handler.pending_initial_syncs.contains_key(&peer_id),
2658			"Peer should be removed from pending_initial_syncs"
2659		);
2660		assert!(
2661			!handler.initial_sync_peer_queue.contains(&peer_id),
2662			"Peer should be removed from initial_sync_peer_queue"
2663		);
2664	}
2665
2666	#[tokio::test]
2667	async fn test_legitimate_traffic_not_flagged() {
2668		let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2669			build_handler(1);
2670
2671		let peer_id = *handler.peers.keys().next().unwrap();
2672
2673		let start = std::time::Instant::now();
2674		let duration = std::time::Duration::from_secs(5);
2675		let mut counter = 0u32;
2676
2677		while start.elapsed() < duration {
2678			let mut statements = Vec::new();
2679			for i in 0..5_000 {
2680				let mut statement = Statement::new();
2681				statement.set_plain_data(vec![
2682					counter as u8,
2683					(counter >> 8) as u8,
2684					(counter >> 16) as u8,
2685					i as u8,
2686				]);
2687				statements.push(statement);
2688				counter = counter.wrapping_add(1);
2689			}
2690
2691			handler.on_statements(peer_id, statements);
2692
2693			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2694		}
2695
2696		let reports = network.get_reports();
2697		assert!(
2698			!reports
2699				.iter()
2700				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2701			"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2702			reports
2703		);
2704
2705		let disconnected = network.get_disconnected_peers();
2706		assert!(
2707			!disconnected.contains(&peer_id),
2708			"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2709			disconnected
2710		);
2711
2712		assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2713	}
2714
2715	#[tokio::test]
2716	async fn test_just_over_rate_limit_triggers_flooding() {
2717		let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2718			build_handler(1);
2719
2720		let peer_id = *handler.peers.keys().next().unwrap();
2721
2722		let mut statements = Vec::new();
2723		for i in 0..260_000 {
2724			let mut statement = Statement::new();
2725			statement.set_plain_data(vec![
2726				i as u8,
2727				(i >> 8) as u8,
2728				(i >> 16) as u8,
2729				(i >> 24) as u8,
2730			]);
2731			statements.push(statement);
2732		}
2733
2734		handler.on_statements(peer_id, statements);
2735
2736		let reports = network.get_reports();
2737		let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2738		assert!(
2739			reports
2740				.iter()
2741				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2742			"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2743			expected_burst,
2744			reports
2745		);
2746
2747		let disconnected = network.get_disconnected_peers();
2748		assert!(
2749			disconnected.contains(&peer_id),
2750			"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2751			disconnected
2752		);
2753
2754		dispatch_disconnects(&mut handler, &network).await;
2755
2756		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2757	}
2758
2759	#[tokio::test]
2760	async fn test_burst_of_250k_statements_allowed() {
2761		let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2762			build_handler(1);
2763
2764		let peer_id = *handler.peers.keys().next().unwrap();
2765
2766		let mut statements = Vec::new();
2767		for i in 0..250_000 {
2768			let mut statement = Statement::new();
2769			statement.set_plain_data(vec![
2770				i as u8,
2771				(i >> 8) as u8,
2772				(i >> 16) as u8,
2773				(i >> 24) as u8,
2774			]);
2775			statements.push(statement);
2776		}
2777
2778		handler.on_statements(peer_id, statements);
2779
2780		let reports = network.get_reports();
2781		assert!(
2782			!reports
2783				.iter()
2784				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2785			"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2786			reports
2787		);
2788
2789		assert!(
2790			handler.peers.contains_key(&peer_id),
2791			"Peer should still be connected after 250k burst"
2792		);
2793	}
2794
2795	#[tokio::test]
2796	async fn test_sustained_rate_above_limit_triggers_flooding() {
2797		let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2798			build_handler(1);
2799
2800		let peer_id = *handler.peers.keys().next().unwrap();
2801
2802		let mut counter = 0u32;
2803
2804		let start = std::time::Instant::now();
2805		let duration = std::time::Duration::from_secs(5);
2806
2807		let mut flooding_detected = false;
2808		while start.elapsed() < duration {
2809			let mut statements = Vec::new();
2810			for i in 0..30_000 {
2811				let mut statement = Statement::new();
2812				statement.set_plain_data(vec![
2813					counter as u8,
2814					(counter >> 8) as u8,
2815					(counter >> 16) as u8,
2816					i as u8,
2817				]);
2818				statements.push(statement);
2819				counter = counter.wrapping_add(1);
2820			}
2821
2822			handler.on_statements(peer_id, statements);
2823
2824			// Check if flooding was detected
2825			let reports = network.get_reports();
2826			if reports
2827				.iter()
2828				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2829			{
2830				flooding_detected = true;
2831				break;
2832			}
2833
2834			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2835		}
2836
2837		assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2838
2839		let disconnected = network.get_disconnected_peers();
2840		assert!(
2841			disconnected.contains(&peer_id),
2842			"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2843			disconnected
2844		);
2845
2846		dispatch_disconnects(&mut handler, &network).await;
2847
2848		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2849	}
2850
2851	#[tokio::test]
2852	async fn test_v2_peer_detected_when_no_fallback() {
2853		let (mut handler, _statement_store, _network, _notification_service) =
2854			build_handler_no_peers();
2855
2856		let peer_id = PeerId::random();
2857
2858		// No negotiated_fallback means the peer connected on the main protocol (v2).
2859		handler
2860			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2861				peer: peer_id,
2862				direction: sc_network::service::traits::Direction::Inbound,
2863				handshake: vec![],
2864				negotiated_fallback: None,
2865			})
2866			.await;
2867
2868		assert_eq!(
2869			handler.peers.get(&peer_id).unwrap().protocol_version,
2870			PeerProtocolVersion::V2,
2871			"Peer should be detected as v2 when no fallback is negotiated"
2872		);
2873	}
2874
2875	#[tokio::test]
2876	async fn test_v1_peer_detected_when_fallback_negotiated() {
2877		let (mut handler, _statement_store, _network, _notification_service) =
2878			build_handler_no_peers();
2879
2880		let peer_id = PeerId::random();
2881
2882		// negotiated_fallback is Some means the peer fell back to v1.
2883		handler
2884			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2885				peer: peer_id,
2886				direction: sc_network::service::traits::Direction::Inbound,
2887				handshake: vec![],
2888				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2889			})
2890			.await;
2891
2892		assert_eq!(
2893			handler.peers.get(&peer_id).unwrap().protocol_version,
2894			PeerProtocolVersion::V1,
2895			"Peer should be detected as v1 when fallback is negotiated"
2896		);
2897	}
2898
2899	#[tokio::test]
2900	async fn test_v1_peer_decodes_raw_statements() {
2901		let (mut handler, _statement_store, _network, _notification_service) =
2902			build_handler_no_peers();
2903
2904		let peer_id = PeerId::random();
2905		let (queue_sender, queue_receiver) = async_channel::bounded(10);
2906		handler.queue_sender = queue_sender;
2907
2908		// Connect peer as v1 (with fallback).
2909		handler
2910			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2911				peer: peer_id,
2912				direction: sc_network::service::traits::Direction::Inbound,
2913				handshake: vec![],
2914				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2915			})
2916			.await;
2917
2918		// V1 peer sends raw Vec<Statement>.
2919		let mut statement = Statement::new();
2920		statement.set_plain_data(b"v1 statement".to_vec());
2921		let hash = statement.hash();
2922		let raw_encoded = vec![statement].encode();
2923
2924		handler
2925			.handle_notification_event(NotificationEvent::NotificationReceived {
2926				peer: peer_id,
2927				notification: raw_encoded.into(),
2928			})
2929			.await;
2930
2931		let (received, _) = queue_receiver.try_recv().unwrap();
2932		assert_eq!(received.hash(), hash, "V1 peer's raw statement should be decoded correctly");
2933	}
2934
2935	#[tokio::test]
2936	async fn test_v2_peer_decodes_statement_message() {
2937		let (mut handler, _statement_store, _network, _notification_service) =
2938			build_handler_no_peers();
2939
2940		let peer_id = PeerId::random();
2941		let (queue_sender, queue_receiver) = async_channel::bounded(10);
2942		handler.queue_sender = queue_sender;
2943
2944		// Connect peer as v2 (no fallback).
2945		handler
2946			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2947				peer: peer_id,
2948				direction: sc_network::service::traits::Direction::Inbound,
2949				handshake: vec![],
2950				negotiated_fallback: None,
2951			})
2952			.await;
2953
2954		// V2 peer sends StatementMessage::Statements.
2955		let mut statement = Statement::new();
2956		statement.set_plain_data(b"v2 statement".to_vec());
2957		let hash = statement.hash();
2958		let msg = StatementMessage::Statements(vec![statement]);
2959		let encoded = msg.encode();
2960
2961		handler
2962			.handle_notification_event(NotificationEvent::NotificationReceived {
2963				peer: peer_id,
2964				notification: encoded.into(),
2965			})
2966			.await;
2967
2968		let (received, _) = queue_receiver.try_recv().unwrap();
2969		assert_eq!(received.hash(), hash, "V2 peer's StatementMessage should be decoded correctly");
2970	}
2971
2972	#[tokio::test]
2973	async fn test_v2_peer_topic_affinity_stored() {
2974		let (mut handler, _statement_store, _network, _notification_service) =
2975			build_handler_no_peers();
2976
2977		let peer_id = PeerId::random();
2978
2979		// Connect peer as v2.
2980		handler
2981			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2982				peer: peer_id,
2983				direction: sc_network::service::traits::Direction::Inbound,
2984				handshake: vec![],
2985				negotiated_fallback: None,
2986			})
2987			.await;
2988
2989		assert!(
2990			handler.peers.get(&peer_id).unwrap().topic_affinity.is_none(),
2991			"Topic affinity should be None initially"
2992		);
2993
2994		// Send ExplicitTopicAffinity message.
2995		let topic: [u8; 32] = [0xAA; 32];
2996		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
2997		filter.insert(&topic);
2998		let msg = StatementMessage::ExplicitTopicAffinity(filter);
2999		let encoded = msg.encode();
3000
3001		handler
3002			.handle_notification_event(NotificationEvent::NotificationReceived {
3003				peer: peer_id,
3004				notification: encoded.into(),
3005			})
3006			.await;
3007
3008		// Affinity is deferred; process it.
3009		handler.process_pending_affinities();
3010
3011		let peer_data = handler.peers.get(&peer_id).unwrap();
3012		assert!(
3013			peer_data.topic_affinity.is_some(),
3014			"Topic affinity should be set after receiving ExplicitTopicAffinity"
3015		);
3016		// The filter should match the topic we inserted.
3017		assert!(
3018			peer_data.topic_affinity.as_ref().unwrap().contains(&topic),
3019			"Stored affinity filter should match the topic"
3020		);
3021	}
3022
3023	#[tokio::test]
3024	async fn test_topic_affinity_filters_propagation() {
3025		let (mut handler, statement_store, _network, notification_service) =
3026			build_handler_no_peers();
3027
3028		let peer_id = PeerId::random();
3029
3030		// Connect peer as v2.
3031		handler
3032			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3033				peer: peer_id,
3034				direction: sc_network::service::traits::Direction::Inbound,
3035				handshake: vec![],
3036				negotiated_fallback: None,
3037			})
3038			.await;
3039
3040		// Set up topic affinity: peer is interested in topic 0xAA only.
3041		let topic_aa: [u8; 32] = [0xAA; 32];
3042		let topic_bb: [u8; 32] = [0xBB; 32];
3043		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3044		filter.insert(&topic_aa);
3045		let msg = StatementMessage::ExplicitTopicAffinity(filter);
3046		let encoded = msg.encode();
3047		handler
3048			.handle_notification_event(NotificationEvent::NotificationReceived {
3049				peer: peer_id,
3050				notification: encoded.into(),
3051			})
3052			.await;
3053
3054		// Affinity is deferred; process it.
3055		handler.process_pending_affinities();
3056
3057		// Create statements: one matching, one not matching, one with no topics.
3058		let mut stmt_matching = Statement::new();
3059		stmt_matching.set_plain_data(b"matching".to_vec());
3060		stmt_matching.set_topic(0, topic_aa.into());
3061		let hash_matching = stmt_matching.hash();
3062
3063		let mut stmt_not_matching = Statement::new();
3064		stmt_not_matching.set_plain_data(b"not matching".to_vec());
3065		stmt_not_matching.set_topic(0, topic_bb.into());
3066		let hash_not_matching = stmt_not_matching.hash();
3067
3068		let mut stmt_no_topic = Statement::new();
3069		stmt_no_topic.set_plain_data(b"no topic".to_vec());
3070		let hash_no_topic = stmt_no_topic.hash();
3071
3072		statement_store
3073			.recent_statements
3074			.lock()
3075			.unwrap()
3076			.insert(hash_matching, stmt_matching);
3077		statement_store
3078			.recent_statements
3079			.lock()
3080			.unwrap()
3081			.insert(hash_not_matching, stmt_not_matching);
3082		statement_store
3083			.recent_statements
3084			.lock()
3085			.unwrap()
3086			.insert(hash_no_topic, stmt_no_topic);
3087
3088		handler.propagate_statements().await;
3089
3090		let sent = notification_service.get_sent_notifications();
3091		let mut sent_hashes: Vec<_> = sent
3092			.iter()
3093			.flat_map(|(_, notification)| {
3094				// V2 peer gets StatementMessage encoding.
3095				match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3096					StatementMessage::Statements(stmts) => stmts,
3097					_ => panic!("Expected StatementMessage::Statements"),
3098				}
3099			})
3100			.map(|s| s.hash())
3101			.collect();
3102		sent_hashes.sort();
3103
3104		// Matching and no-topic statements should be sent; non-matching should be filtered.
3105		assert!(
3106			sent_hashes.contains(&hash_matching),
3107			"Statement matching topic affinity should be propagated"
3108		);
3109		assert!(
3110			sent_hashes.contains(&hash_no_topic),
3111			"Statement with no topics should be propagated (broadcast)"
3112		);
3113		assert!(
3114			!sent_hashes.contains(&hash_not_matching),
3115			"Statement NOT matching topic affinity should be filtered out"
3116		);
3117	}
3118
3119	#[tokio::test]
3120	async fn test_v1_peer_no_topic_filtering() {
3121		let (mut handler, statement_store, _network, notification_service) =
3122			build_handler_no_peers();
3123
3124		let peer_id = PeerId::random();
3125
3126		// Connect peer as v1 (with fallback).
3127		handler
3128			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3129				peer: peer_id,
3130				direction: sc_network::service::traits::Direction::Inbound,
3131				handshake: vec![],
3132				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3133			})
3134			.await;
3135
3136		// V1 peers have no topic affinity - all statements should be propagated.
3137		let topic_aa: [u8; 32] = [0xAA; 32];
3138		let mut stmt_with_topic = Statement::new();
3139		stmt_with_topic.set_plain_data(b"with topic".to_vec());
3140		stmt_with_topic.set_topic(0, topic_aa.into());
3141		let hash_with_topic = stmt_with_topic.hash();
3142
3143		let mut stmt_no_topic = Statement::new();
3144		stmt_no_topic.set_plain_data(b"no topic".to_vec());
3145		let hash_no_topic = stmt_no_topic.hash();
3146
3147		statement_store
3148			.recent_statements
3149			.lock()
3150			.unwrap()
3151			.insert(hash_with_topic, stmt_with_topic);
3152		statement_store
3153			.recent_statements
3154			.lock()
3155			.unwrap()
3156			.insert(hash_no_topic, stmt_no_topic);
3157
3158		handler.propagate_statements().await;
3159
3160		let sent = notification_service.get_sent_notifications();
3161		let sent_hashes: Vec<_> = sent
3162			.iter()
3163			.flat_map(|(_, notification)| {
3164				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
3165			})
3166			.map(|s| s.hash())
3167			.collect();
3168
3169		assert_eq!(
3170			sent_hashes.len(),
3171			2,
3172			"V1 peer should receive all statements regardless of topics"
3173		);
3174		assert!(sent_hashes.contains(&hash_with_topic));
3175		assert!(sent_hashes.contains(&hash_no_topic));
3176	}
3177
3178	#[tokio::test]
3179	async fn test_affinity_change_triggers_resync() {
3180		let (mut handler, statement_store, _network, notification_service) =
3181			build_handler_no_peers_light();
3182
3183		let peer_id = PeerId::random();
3184
3185		// Add statements with different topics to the store.
3186		let topic_aa: [u8; 32] = [0xAA; 32];
3187		let topic_bb: [u8; 32] = [0xBB; 32];
3188
3189		let mut stmt_aa = Statement::new();
3190		stmt_aa.set_plain_data(b"stmt_aa".to_vec());
3191		stmt_aa.set_topic(0, topic_aa.into());
3192		let hash_aa = stmt_aa.hash();
3193
3194		let mut stmt_bb = Statement::new();
3195		stmt_bb.set_plain_data(b"stmt_bb".to_vec());
3196		stmt_bb.set_topic(0, topic_bb.into());
3197		let hash_bb = stmt_bb.hash();
3198
3199		let mut stmt_no_topic = Statement::new();
3200		stmt_no_topic.set_plain_data(b"no topic".to_vec());
3201		let hash_no_topic = stmt_no_topic.hash();
3202
3203		statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa);
3204		statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb);
3205		statement_store.statements.lock().unwrap().insert(hash_no_topic, stmt_no_topic);
3206
3207		// Connect peer as v2.
3208		handler
3209			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3210				peer: peer_id,
3211				direction: sc_network::service::traits::Direction::Inbound,
3212				handshake: vec![],
3213				negotiated_fallback: None,
3214			})
3215			.await;
3216
3217		// Light V2 peers should NOT get initial sync on connect (must set affinity first).
3218		assert!(
3219			!handler.pending_initial_syncs.contains_key(&peer_id),
3220			"Light V2 peer should NOT have initial sync scheduled on connect"
3221		);
3222
3223		// Set topic affinity to topic_aa — this triggers the first initial sync.
3224		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3225		filter.insert(&topic_aa);
3226		let msg = StatementMessage::ExplicitTopicAffinity(filter);
3227		let encoded = msg.encode();
3228		handler
3229			.handle_notification_event(NotificationEvent::NotificationReceived {
3230				peer: peer_id,
3231				notification: encoded.into(),
3232			})
3233			.await;
3234
3235		// Affinity is deferred; process it.
3236		handler.process_pending_affinities();
3237
3238		assert!(
3239			handler.pending_initial_syncs.contains_key(&peer_id),
3240			"Initial sync should be scheduled after setting affinity"
3241		);
3242
3243		// Drain initial sync — only stmt_aa and stmt_no_topic should be sent.
3244		while handler.pending_initial_syncs.contains_key(&peer_id) {
3245			handler.process_initial_sync_burst().await;
3246		}
3247
3248		let sent = notification_service.get_sent_notifications();
3249		let sent_hashes: HashSet<_> = sent
3250			.iter()
3251			.flat_map(|(_, notification)| {
3252				match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3253					StatementMessage::Statements(stmts) => stmts,
3254					_ => panic!("Expected StatementMessage::Statements"),
3255				}
3256			})
3257			.map(|s| s.hash())
3258			.collect();
3259		assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
3260		assert!(
3261			sent_hashes.contains(&hash_no_topic),
3262			"stmt_no_topic should be sent (broadcast, no topic)"
3263		);
3264		assert!(!sent_hashes.contains(&hash_bb), "stmt_bb should NOT be sent (filtered)");
3265
3266		// Now change affinity to topic_bb — triggers re-sync.
3267		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3268		filter.insert(&topic_bb);
3269		let msg = StatementMessage::ExplicitTopicAffinity(filter);
3270		let encoded = msg.encode();
3271		handler
3272			.handle_notification_event(NotificationEvent::NotificationReceived {
3273				peer: peer_id,
3274				notification: encoded.into(),
3275			})
3276			.await;
3277
3278		// Affinity is deferred; process it.
3279		handler.process_pending_affinities();
3280
3281		assert!(
3282			handler.pending_initial_syncs.contains_key(&peer_id),
3283			"Initial sync should be re-scheduled after affinity change"
3284		);
3285
3286		notification_service.clear_sent_notifications();
3287		while handler.pending_initial_syncs.contains_key(&peer_id) {
3288			handler.process_initial_sync_burst().await;
3289		}
3290
3291		let sent_after_bb = notification_service.get_sent_notifications();
3292		let sent_hashes_bb: HashSet<_> = sent_after_bb
3293			.iter()
3294			.flat_map(|(_, notification)| {
3295				match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3296					StatementMessage::Statements(stmts) => stmts,
3297					_ => panic!("Expected StatementMessage::Statements"),
3298				}
3299			})
3300			.map(|s| s.hash())
3301			.collect();
3302		// stmt_bb was previously filtered and should now be sent.
3303		assert!(
3304			sent_hashes_bb.contains(&hash_bb),
3305			"stmt_bb should now be sent after affinity changed to topic_bb"
3306		);
3307		// Known statements are redelivered on affinity change.
3308		assert!(
3309			sent_hashes_bb.contains(&hash_no_topic),
3310			"stmt_no_topic should be re-sent (known_statements cleared on affinity change)"
3311		);
3312	}
3313
3314	#[tokio::test]
3315	async fn test_affinity_change_sends_previously_filtered_statements() {
3316		// This tests the scenario where:
3317		// 1. Peer connects and immediately sets affinity (before initial sync).
3318		// 2. Statements not matching the initial affinity are NOT marked as known.
3319		// 3. When affinity changes to include those topics, they ARE sent.
3320		let (mut handler, statement_store, _network, notification_service) =
3321			build_handler_no_peers_light();
3322
3323		let peer_id = PeerId::random();
3324
3325		let topic_aa: [u8; 32] = [0xAA; 32];
3326		let topic_bb: [u8; 32] = [0xBB; 32];
3327
3328		let mut stmt_aa = Statement::new();
3329		stmt_aa.set_plain_data(b"stmt_aa".to_vec());
3330		stmt_aa.set_topic(0, topic_aa.into());
3331		let hash_aa = stmt_aa.hash();
3332
3333		let mut stmt_bb = Statement::new();
3334		stmt_bb.set_plain_data(b"stmt_bb".to_vec());
3335		stmt_bb.set_topic(0, topic_bb.into());
3336		let hash_bb = stmt_bb.hash();
3337
3338		statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa.clone());
3339		statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb.clone());
3340
3341		// Also put them in recent_statements so propagate_statements can find them.
3342		statement_store.recent_statements.lock().unwrap().insert(hash_aa, stmt_aa);
3343		statement_store.recent_statements.lock().unwrap().insert(hash_bb, stmt_bb);
3344
3345		// Connect peer as v2.
3346		handler
3347			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3348				peer: peer_id,
3349				direction: sc_network::service::traits::Direction::Inbound,
3350				handshake: vec![],
3351				negotiated_fallback: None,
3352			})
3353			.await;
3354
3355		// Immediately set affinity to topic_aa BEFORE any initial sync runs.
3356		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3357		filter.insert(&topic_aa);
3358		let msg = StatementMessage::ExplicitTopicAffinity(filter);
3359		let encoded = msg.encode();
3360		handler
3361			.handle_notification_event(NotificationEvent::NotificationReceived {
3362				peer: peer_id,
3363				notification: encoded.into(),
3364			})
3365			.await;
3366
3367		// Affinity is deferred; process it.
3368		handler.process_pending_affinities();
3369
3370		// Drain initial sync — should only send stmt_aa (matches affinity).
3371		while handler.pending_initial_syncs.contains_key(&peer_id) {
3372			handler.process_initial_sync_burst().await;
3373		}
3374
3375		let sent = notification_service.get_sent_notifications();
3376		let sent_hashes: HashSet<_> = sent
3377			.iter()
3378			.flat_map(|(_, notification)| {
3379				match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3380					StatementMessage::Statements(stmts) => stmts,
3381					_ => panic!("Expected StatementMessage::Statements"),
3382				}
3383			})
3384			.map(|s| s.hash())
3385			.collect();
3386		assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
3387		assert!(
3388			!sent_hashes.contains(&hash_bb),
3389			"stmt_bb should NOT be sent (filtered by affinity)"
3390		);
3391
3392		// Now propagate_statements — stmt_bb should be filtered by affinity and NOT marked as
3393		// known.
3394		handler.propagate_statements().await;
3395
3396		// Verify stmt_bb was NOT marked as known (the bug fix).
3397		let peer = handler.peers.get(&peer_id).unwrap();
3398		assert!(
3399			!peer.known_statements.contains(&hash_bb),
3400			"stmt_bb should NOT be in known_statements (filtered by affinity)"
3401		);
3402		assert!(peer.known_statements.contains(&hash_aa), "stmt_aa should be in known_statements");
3403
3404		// Now change affinity to include topic_bb.
3405		let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3406		filter.insert(&topic_aa);
3407		filter.insert(&topic_bb);
3408		let msg = StatementMessage::ExplicitTopicAffinity(filter);
3409		let encoded = msg.encode();
3410
3411		notification_service.clear_sent_notifications();
3412		handler
3413			.handle_notification_event(NotificationEvent::NotificationReceived {
3414				peer: peer_id,
3415				notification: encoded.into(),
3416			})
3417			.await;
3418
3419		// Affinity is deferred; process it.
3420		handler.process_pending_affinities();
3421
3422		// Drain re-sync — stmt_bb should now be sent.
3423		while handler.pending_initial_syncs.contains_key(&peer_id) {
3424			handler.process_initial_sync_burst().await;
3425		}
3426
3427		let sent = notification_service.get_sent_notifications();
3428		let sent_hashes: HashSet<_> = sent
3429			.iter()
3430			.flat_map(|(_, notification)| {
3431				match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3432					StatementMessage::Statements(stmts) => stmts,
3433					_ => panic!("Expected StatementMessage::Statements"),
3434				}
3435			})
3436			.map(|s| s.hash())
3437			.collect();
3438		assert!(
3439			sent_hashes.contains(&hash_bb),
3440			"stmt_bb should now be sent after affinity expanded to include topic_bb"
3441		);
3442		// stmt_aa is also redelivered on affinity change.
3443		assert!(
3444			sent_hashes.contains(&hash_aa),
3445			"stmt_aa should be re-sent (known_statements cleared on affinity change)"
3446		);
3447	}
3448
3449	#[test]
3450	fn test_encode_statement_refs_matches_derive_encoding() {
3451		let mut stmt1 = Statement::new();
3452		stmt1.set_plain_data(b"first".to_vec());
3453		let mut stmt2 = Statement::new();
3454		stmt2.set_plain_data(b"second".to_vec());
3455
3456		let refs: Vec<&Statement> = vec![&stmt1, &stmt2];
3457		let hand_rolled = StatementMessage::encode_statement_refs(&refs);
3458		let derive_encoded = StatementMessage::Statements(vec![stmt1, stmt2]).encode();
3459
3460		assert_eq!(
3461			hand_rolled, derive_encoded,
3462			"encode_statement_refs must produce identical bytes to derive Encode"
3463		);
3464	}
3465
3466	#[test]
3467	fn test_encode_statement_refs_empty() {
3468		let refs: Vec<&Statement> = vec![];
3469		let hand_rolled = StatementMessage::encode_statement_refs(&refs);
3470		let derive_encoded = StatementMessage::Statements(vec![]).encode();
3471
3472		assert_eq!(hand_rolled, derive_encoded);
3473	}
3474
3475	#[test]
3476	fn test_can_receive_all_combinations() {
3477		let make_peer = |is_light: bool, version: PeerProtocolVersion, has_affinity: bool| {
3478			let topic_affinity = has_affinity.then(|| AffinityFilter::new(BLOOM_SEED, 0.01, 10));
3479			Peer {
3480				known_statements: LruHashSet::new(NonZeroUsize::new(10).unwrap()),
3481				rate_limiter: PeerRateLimiter::new(
3482					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).expect("nonzero"),
3483					NonZeroU32::new(
3484						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3485					)
3486					.expect("nonzero"),
3487				),
3488				protocol_version: version,
3489				topic_affinity,
3490				is_light,
3491				pending_topic_affinity: None,
3492			}
3493		};
3494
3495		// Full node, V1, no affinity → can receive
3496		assert!(make_peer(false, PeerProtocolVersion::V1, false).can_receive());
3497		// Full node, V2, no affinity → can receive
3498		assert!(make_peer(false, PeerProtocolVersion::V2, false).can_receive());
3499		// Light, V1, no affinity → can receive (V1 doesn't gate)
3500		assert!(make_peer(true, PeerProtocolVersion::V1, false).can_receive());
3501		// Light, V2, no affinity → CANNOT receive (must set affinity first)
3502		assert!(!make_peer(true, PeerProtocolVersion::V2, false).can_receive());
3503		// Light, V2, with affinity → can receive
3504		assert!(make_peer(true, PeerProtocolVersion::V2, true).can_receive());
3505		// Full node, V2, with affinity → can receive
3506		assert!(make_peer(false, PeerProtocolVersion::V2, true).can_receive());
3507	}
3508
3509	#[tokio::test]
3510	async fn test_send_chunk_v1_vs_v2_encoding() {
3511		let (mut handler, _statement_store, _network, notification_service) =
3512			build_handler_no_peers();
3513
3514		let v1_peer = PeerId::random();
3515		let v2_peer = PeerId::random();
3516
3517		// Connect V1 peer.
3518		handler
3519			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3520				peer: v1_peer,
3521				direction: sc_network::service::traits::Direction::Inbound,
3522				handshake: vec![],
3523				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3524			})
3525			.await;
3526
3527		// Connect V2 peer.
3528		handler
3529			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3530				peer: v2_peer,
3531				direction: sc_network::service::traits::Direction::Inbound,
3532				handshake: vec![],
3533				negotiated_fallback: None,
3534			})
3535			.await;
3536
3537		let mut stmt = Statement::new();
3538		stmt.set_plain_data(b"encoding test".to_vec());
3539
3540		// Send to V1 peer.
3541		notification_service.clear_sent_notifications();
3542		handler.send_statement_chunk(&v1_peer, &[&stmt]).await;
3543		let v1_sent = notification_service.get_sent_notifications();
3544		assert_eq!(v1_sent.len(), 1);
3545		let v1_bytes = &v1_sent[0].1;
3546		// V1 encoding is raw Vec<Statement>.
3547		let decoded_v1 = <Statements as Decode>::decode(&mut v1_bytes.as_slice())
3548			.expect("V1 peer should receive raw Vec<Statement> encoding");
3549		assert_eq!(decoded_v1.len(), 1);
3550
3551		// Send to V2 peer.
3552		notification_service.clear_sent_notifications();
3553		handler.send_statement_chunk(&v2_peer, &[&stmt]).await;
3554		let v2_sent = notification_service.get_sent_notifications();
3555		assert_eq!(v2_sent.len(), 1);
3556		let v2_bytes = &v2_sent[0].1;
3557		// V2 encoding is StatementMessage::Statements.
3558		let decoded_v2 = StatementMessage::decode(&mut v2_bytes.as_slice())
3559			.expect("V2 peer should receive StatementMessage encoding");
3560		match decoded_v2 {
3561			StatementMessage::Statements(stmts) => assert_eq!(stmts.len(), 1),
3562			_ => panic!("Expected StatementMessage::Statements for V2 peer"),
3563		}
3564
3565		// Verify the two encodings are different (V2 has an extra enum discriminant byte).
3566		assert_ne!(v1_bytes, v2_bytes, "V1 and V2 encodings should differ");
3567	}
3568
3569	#[tokio::test]
3570	async fn test_schedule_initial_sync_replaces_existing() {
3571		let (mut handler, statement_store, _network, _notification_service) =
3572			build_handler_no_peers();
3573
3574		let peer_id = PeerId::random();
3575
3576		// Add some statements to the store.
3577		let mut stmt1 = Statement::new();
3578		stmt1.set_plain_data(b"stmt1".to_vec());
3579		let hash1 = stmt1.hash();
3580		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
3581
3582		// Connect peer as V1.
3583		handler
3584			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3585				peer: peer_id,
3586				direction: sc_network::service::traits::Direction::Inbound,
3587				handshake: vec![],
3588				negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3589			})
3590			.await;
3591
3592		// Should have initial sync scheduled.
3593		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
3594		assert_eq!(
3595			handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
3596			1,
3597			"Peer should appear exactly once in the queue"
3598		);
3599
3600		// Add another statement and re-schedule.
3601		let mut stmt2 = Statement::new();
3602		stmt2.set_plain_data(b"stmt2".to_vec());
3603		let hash2 = stmt2.hash();
3604		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
3605
3606		handler.schedule_initial_sync_for_peer(peer_id);
3607
3608		// Peer should still appear exactly once in the queue (no duplicates).
3609		assert_eq!(
3610			handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
3611			1,
3612			"Peer should NOT be duplicated in the queue after re-schedule"
3613		);
3614		// The new sync should contain both hashes.
3615		let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
3616		assert!(pending.hashes.contains(&hash1));
3617		assert!(pending.hashes.contains(&hash2));
3618	}
3619
3620	#[tokio::test]
3621	async fn test_initial_sync_queued_during_major_sync_processed_after() {
3622		let statement_store = TestStatementStore::new();
3623		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
3624		let network = TestNetwork::new();
3625		let notification_service = TestNotificationService::new();
3626		let sync = TestSync::new();
3627		// Set major syncing to true.
3628		sync.major_syncing.store(true, Ordering::Relaxed);
3629
3630		let mut handler = StatementHandler {
3631			protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
3632			notification_service: Box::new(notification_service.clone()),
3633			propagate_timeout: (Box::pin(futures::stream::pending())
3634				as Pin<Box<dyn Stream<Item = ()> + Send>>)
3635				.fuse(),
3636			pending_statements: FuturesUnordered::new(),
3637			pending_statements_peers: HashMap::new(),
3638			network: network.clone(),
3639			sync: sync.clone(),
3640			sync_event_stream: (Box::pin(futures::stream::pending())
3641				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
3642				.fuse(),
3643			peers: HashMap::new(),
3644			statement_store: Arc::new(statement_store.clone()),
3645			queue_sender,
3646			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
3647				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
3648			metrics: None,
3649			initial_sync_timeout: Box::pin(futures::future::pending()),
3650			pending_affinities_timeout: Box::pin(futures::future::pending()),
3651			pending_initial_syncs: HashMap::new(),
3652			initial_sync_peer_queue: VecDeque::new(),
3653			deferred_peers: HashSet::new(),
3654			dropped_statements_during_sync: false,
3655			sync_recovery_peer: None,
3656			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
3657		};
3658
3659		// Add a statement so there's something to sync.
3660		let mut stmt = Statement::new();
3661		stmt.set_plain_data(b"during major sync".to_vec());
3662		let hash = stmt.hash();
3663		statement_store.statements.lock().unwrap().insert(hash, stmt);
3664
3665		// Add a peer manually.
3666		let peer_id = PeerId::random();
3667		handler.peers.insert(
3668			peer_id,
3669			Peer::new_for_testing(
3670				LruHashSet::new(NonZeroUsize::new(100).unwrap()),
3671				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
3672				NonZeroU32::new(
3673					DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3674				)
3675				.unwrap(),
3676			),
3677		);
3678
3679		// Scheduling during major sync should queue the peer.
3680		handler.schedule_initial_sync_for_peer(peer_id);
3681
3682		assert!(
3683			handler.pending_initial_syncs.contains_key(&peer_id),
3684			"Initial sync should be queued even during major sync"
3685		);
3686		assert_eq!(handler.initial_sync_peer_queue.len(), 1);
3687
3688		// But burst processing should be a no-op while major syncing.
3689		handler.process_initial_sync_burst().await;
3690		assert!(
3691			handler.pending_initial_syncs.contains_key(&peer_id),
3692			"Pending sync should remain untouched during major sync"
3693		);
3694
3695		// Once major sync completes, burst processing should proceed.
3696		sync.major_syncing.store(false, Ordering::Relaxed);
3697		handler.process_initial_sync_burst().await;
3698		assert!(
3699			handler.initial_sync_peer_queue.is_empty(),
3700			"Peer should have been processed after major sync ended"
3701		);
3702	}
3703
3704	#[tokio::test]
3705	async fn test_schedule_initial_sync_resends_all_matching() {
3706		let (mut handler, statement_store, _network, _notification_service) =
3707			build_handler_no_peers();
3708
3709		let peer_id = PeerId::random();
3710
3711		// Add statements to the store.
3712		let mut stmt1 = Statement::new();
3713		stmt1.set_plain_data(b"known".to_vec());
3714		let hash1 = stmt1.hash();
3715		let mut stmt2 = Statement::new();
3716		stmt2.set_plain_data(b"unknown".to_vec());
3717		let hash2 = stmt2.hash();
3718
3719		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
3720		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
3721
3722		// Add peer manually with hash1 already known.
3723		let mut known = LruHashSet::new(NonZeroUsize::new(100).unwrap());
3724		known.insert(hash1);
3725		handler.peers.insert(
3726			peer_id,
3727			Peer {
3728				known_statements: known,
3729				rate_limiter: PeerRateLimiter::new(
3730					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
3731					NonZeroU32::new(
3732						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3733					)
3734					.unwrap(),
3735				),
3736				protocol_version: PeerProtocolVersion::V1,
3737				topic_affinity: None,
3738				is_light: false,
3739				pending_topic_affinity: None,
3740			},
3741		);
3742
3743		handler.schedule_initial_sync_for_peer(peer_id);
3744
3745		let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
3746		// all hashes are included for redelivery.
3747		assert!(
3748			pending.hashes.contains(&hash1),
3749			"Previously known hash should be included after affinity change"
3750		);
3751		assert!(pending.hashes.contains(&hash2), "Unknown hash should be included in initial sync");
3752		// known_statements should have been cleared.
3753		let peer_data = handler.peers.get(&peer_id).unwrap();
3754		assert!(
3755			!peer_data.known_statements.contains(&hash1),
3756			"known_statements should be cleared after schedule_initial_sync_for_peer"
3757		);
3758	}
3759
3760	#[tokio::test]
3761	async fn test_malformed_v2_message_does_not_panic() {
3762		let (mut handler, _statement_store, _network, _notification_service) =
3763			build_handler_no_peers();
3764
3765		let peer_id = PeerId::random();
3766
3767		// Connect peer as V2.
3768		handler
3769			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3770				peer: peer_id,
3771				direction: sc_network::service::traits::Direction::Inbound,
3772				handshake: vec![],
3773				negotiated_fallback: None,
3774			})
3775			.await;
3776
3777		// Send garbage data — should not panic, just log debug.
3778		handler
3779			.handle_notification_event(NotificationEvent::NotificationReceived {
3780				peer: peer_id,
3781				notification: vec![0xFF, 0xFE, 0xFD].into(),
3782			})
3783			.await;
3784
3785		// Send V1-encoded data to V2 peer — also should not panic.
3786		let mut stmt = Statement::new();
3787		stmt.set_plain_data(b"v1 encoded".to_vec());
3788		let v1_encoded = vec![stmt].encode();
3789		handler
3790			.handle_notification_event(NotificationEvent::NotificationReceived {
3791				peer: peer_id,
3792				notification: v1_encoded.into(),
3793			})
3794			.await;
3795
3796		// If we got here without panic, the test passes.
3797		assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
3798	}
3799
3800	#[test]
3801	fn test_find_sendable_chunk_v2_overhead() {
3802		let v1_max = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
3803		let v2_max = max_statement_payload_size(V2_ENVELOPE_OVERHEAD);
3804
3805		// V2 has strictly less payload space than V1.
3806		assert!(
3807			v2_max < v1_max,
3808			"V2 payload capacity ({v2_max}) should be less than V1 ({v1_max})"
3809		);
3810		assert_eq!(v1_max - v2_max, 1, "V2 overhead is exactly 1 byte more than V1");
3811
3812		// Create enough statements to fill V1 but not V2.
3813		let stmts: Vec<Statement> = (0..1000)
3814			.map(|i| {
3815				let mut s = Statement::new();
3816				s.set_plain_data(format!("stmt-{i}").into_bytes());
3817				s
3818			})
3819			.collect();
3820		let refs: Vec<&Statement> = stmts.iter().collect();
3821
3822		let v1_chunk = find_sendable_chunk(&refs, V1_ENVELOPE_OVERHEAD);
3823		let v2_chunk = find_sendable_chunk(&refs, V2_ENVELOPE_OVERHEAD);
3824
3825		// V2 should fit the same or fewer statements.
3826		let v1_count = match v1_chunk {
3827			ChunkResult::Send(n) => n,
3828			_ => panic!("Expected Send for V1"),
3829		};
3830		let v2_count = match v2_chunk {
3831			ChunkResult::Send(n) => n,
3832			_ => panic!("Expected Send for V2"),
3833		};
3834		assert!(
3835			v2_count <= v1_count,
3836			"V2 ({v2_count}) should fit at most as many statements as V1 ({v1_count})"
3837		);
3838	}
3839
3840	#[tokio::test]
3841	async fn test_full_node_v2_gets_initial_sync_immediately() {
3842		let (mut handler, statement_store, _network, _notification_service) =
3843			build_handler_no_peers();
3844
3845		// Add a statement so there's something to sync.
3846		let mut stmt = Statement::new();
3847		stmt.set_plain_data(b"full node v2".to_vec());
3848		let hash = stmt.hash();
3849		statement_store.statements.lock().unwrap().insert(hash, stmt);
3850
3851		let peer_id = PeerId::random();
3852
3853		// Connect as full-node V2 (no fallback, network returns Full role).
3854		handler
3855			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
3856				peer: peer_id,
3857				direction: sc_network::service::traits::Direction::Inbound,
3858				handshake: vec![],
3859				negotiated_fallback: None,
3860			})
3861			.await;
3862
3863		// Full-node V2 peer should get initial sync immediately (not gated).
3864		assert!(
3865			handler.pending_initial_syncs.contains_key(&peer_id),
3866			"Full-node V2 peer should have initial sync scheduled immediately"
3867		);
3868		assert_eq!(handler.peers.get(&peer_id).unwrap().protocol_version, PeerProtocolVersion::V2);
3869		assert!(!handler.peers.get(&peer_id).unwrap().is_light);
3870	}
3871
3872	#[tokio::test]
3873	async fn test_propagation_reaches_all_connected_peers() {
3874		let (
3875			mut handler,
3876			statement_store,
3877			_network,
3878			notification_service,
3879			_queue_receiver,
3880			peer_ids,
3881		) = build_handler(5);
3882
3883		// Insert 3 statements into recent_statements for propagation
3884		let mut expected_hashes = Vec::new();
3885		for i in 0..3u8 {
3886			let mut statement = Statement::new();
3887			statement.set_plain_data(vec![i; 100]);
3888			let hash = statement.hash();
3889			expected_hashes.push(hash);
3890			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
3891		}
3892		expected_hashes.sort();
3893
3894		handler.propagate_statements().await;
3895
3896		let sent = notification_service.get_sent_notifications();
3897
3898		// Verify each peer received all 3 statements
3899		for peer_id in &peer_ids {
3900			let mut received_hashes = get_peer_hashes(&sent, *peer_id);
3901			received_hashes.sort();
3902
3903			assert_eq!(
3904				received_hashes, expected_hashes,
3905				"Peer {peer_id} should have received all 3 statements"
3906			);
3907		}
3908
3909		// Recent statements should be drained
3910		assert!(statement_store.recent_statements.lock().unwrap().is_empty());
3911	}
3912
3913	#[tokio::test]
3914	async fn test_known_statement_filtering_per_peer() {
3915		let (
3916			mut handler,
3917			statement_store,
3918			_network,
3919			notification_service,
3920			_queue_receiver,
3921			peer_ids,
3922		) = build_handler(3);
3923
3924		let peer_a = peer_ids[0];
3925		let peer_b = peer_ids[1];
3926		let peer_c = peer_ids[2];
3927
3928		// Create 5 statements
3929		let mut hashes = Vec::new();
3930		for i in 0..5u8 {
3931			let mut statement = Statement::new();
3932			statement.set_plain_data(vec![i; 100]);
3933			let hash = statement.hash();
3934			hashes.push(hash);
3935			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
3936		}
3937
3938		// Pre-populate known_statements: peer_a knows s1,s2; peer_b knows s3; peer_c knows none
3939		handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[0]);
3940		handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[1]);
3941		handler.peers.get_mut(&peer_b).unwrap().known_statements.insert(hashes[2]);
3942
3943		handler.propagate_statements().await;
3944
3945		let sent = notification_service.get_sent_notifications();
3946
3947		let peer_a_hashes = get_peer_hashes(&sent, peer_a);
3948		let peer_b_hashes = get_peer_hashes(&sent, peer_b);
3949		let peer_c_hashes = get_peer_hashes(&sent, peer_c);
3950
3951		// peer_a already knows s1,s2 → should only get s3,s4,s5
3952		assert_eq!(peer_a_hashes.len(), 3, "peer_a should get 3 statements");
3953		assert!(!peer_a_hashes.contains(&hashes[0]), "peer_a already knows s1");
3954		assert!(!peer_a_hashes.contains(&hashes[1]), "peer_a already knows s2");
3955		assert!(peer_a_hashes.contains(&hashes[2]));
3956		assert!(peer_a_hashes.contains(&hashes[3]));
3957		assert!(peer_a_hashes.contains(&hashes[4]));
3958
3959		// peer_b already knows s3 → should get s1,s2,s4,s5
3960		assert_eq!(peer_b_hashes.len(), 4, "peer_b should get 4 statements");
3961		assert!(!peer_b_hashes.contains(&hashes[2]), "peer_b already knows s3");
3962		assert!(peer_b_hashes.contains(&hashes[0]));
3963		assert!(peer_b_hashes.contains(&hashes[1]));
3964		assert!(peer_b_hashes.contains(&hashes[3]));
3965		assert!(peer_b_hashes.contains(&hashes[4]));
3966
3967		// peer_c knows nothing → should get all 5
3968		let mut sorted_peer_c: Vec<_> = peer_c_hashes.into_iter().collect();
3969		sorted_peer_c.sort();
3970		let mut all_hashes = hashes.clone();
3971		all_hashes.sort();
3972		assert_eq!(sorted_peer_c, all_hashes, "peer_c should get all 5 statements");
3973	}
3974
3975	/// Verifies that peers connecting during major sync are buffered in `deferred_peers` with no
3976	/// network calls, and that a disconnect before sync ends removes the peer from the buffer
3977	#[test]
3978	fn major_sync_defers_peers_and_handles_disconnect() {
3979		let (sync, _flag) = TestSync::with_syncing(true);
3980		let network = TestNetwork::new();
3981		let notification_service = TestNotificationService::new();
3982		let statement_store = TestStatementStore::new();
3983		let (queue_sender, _queue_receiver) = async_channel::bounded(100);
3984
3985		let mut handler = StatementHandler {
3986			protocol_name: "/statement/1".into(),
3987			notification_service: Box::new(notification_service),
3988			propagate_timeout: (Box::pin(futures::stream::pending())
3989				as Pin<Box<dyn Stream<Item = ()> + Send>>)
3990				.fuse(),
3991			pending_statements: FuturesUnordered::new(),
3992			pending_statements_peers: HashMap::new(),
3993			network: network.clone(),
3994			sync,
3995			sync_event_stream: (Box::pin(futures::stream::pending())
3996				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
3997				.fuse(),
3998			peers: HashMap::new(),
3999			statement_store: Arc::new(statement_store),
4000			queue_sender,
4001			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4002				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4003			metrics: None,
4004			initial_sync_timeout: Box::pin(futures::future::pending()),
4005			pending_affinities_timeout: Box::pin(futures::future::pending()),
4006			pending_initial_syncs: HashMap::new(),
4007			initial_sync_peer_queue: VecDeque::new(),
4008			deferred_peers: HashSet::new(),
4009			dropped_statements_during_sync: false,
4010			sync_recovery_peer: None,
4011			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4012		};
4013
4014		let peer1 = PeerId::random();
4015		let peer2 = PeerId::random();
4016		let peer3 = PeerId::random();
4017
4018		handler.handle_sync_event(SyncEvent::PeerConnected(peer1));
4019		handler.handle_sync_event(SyncEvent::PeerConnected(peer2));
4020		handler.handle_sync_event(SyncEvent::PeerConnected(peer3));
4021
4022		// No network calls while major sync is active
4023		assert!(network.get_added_reserved().is_empty());
4024		assert!(network.get_removed_reserved().is_empty());
4025		assert_eq!(handler.deferred_peers.len(), 3);
4026
4027		// Disconnect before sync ends must remove from buffer only
4028		handler.handle_sync_event(SyncEvent::PeerDisconnected(peer1));
4029		assert_eq!(handler.deferred_peers.len(), 2);
4030		assert!(!handler.deferred_peers.contains(&peer1), "disconnected peer must leave buffer");
4031		assert!(handler.deferred_peers.contains(&peer2));
4032		assert!(handler.deferred_peers.contains(&peer3));
4033		assert!(network.get_removed_reserved().is_empty(), "no remove call for buffered peer");
4034	}
4035
4036	#[test]
4037	fn deferred_peers_flushed_on_sync_end_without_remove() {
4038		let (sync, flag) = TestSync::with_syncing(true);
4039		let network = TestNetwork::new();
4040		let notification_service = TestNotificationService::new();
4041		let statement_store = TestStatementStore::new();
4042		let (queue_sender, _queue_receiver) = async_channel::bounded(100);
4043
4044		let peer1 = PeerId::random();
4045		let peer2 = PeerId::random();
4046		let mut deferred = HashSet::new();
4047		deferred.insert(peer1);
4048		deferred.insert(peer2);
4049
4050		let mut handler = StatementHandler {
4051			protocol_name: "/statement/1".into(),
4052			notification_service: Box::new(notification_service),
4053			propagate_timeout: (Box::pin(futures::stream::pending())
4054				as Pin<Box<dyn Stream<Item = ()> + Send>>)
4055				.fuse(),
4056			pending_statements: FuturesUnordered::new(),
4057			pending_statements_peers: HashMap::new(),
4058			network: network.clone(),
4059			sync,
4060			sync_event_stream: (Box::pin(futures::stream::pending())
4061				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4062				.fuse(),
4063			peers: HashMap::new(),
4064			statement_store: Arc::new(statement_store),
4065			queue_sender,
4066			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4067				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4068			metrics: None,
4069			initial_sync_timeout: Box::pin(futures::future::pending()),
4070			pending_affinities_timeout: Box::pin(futures::future::pending()),
4071			pending_initial_syncs: HashMap::new(),
4072			initial_sync_peer_queue: VecDeque::new(),
4073			deferred_peers: deferred,
4074			dropped_statements_during_sync: false,
4075			sync_recovery_peer: None,
4076			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4077		};
4078
4079		flag.store(false, std::sync::atomic::Ordering::Relaxed);
4080		handler.drain_deferred_peers();
4081
4082		assert!(handler.deferred_peers.is_empty());
4083
4084		let added = network.get_added_reserved();
4085		assert_eq!(added.len(), 1);
4086		let added_addrs = &added[0];
4087		let expected_addr1: sc_network::Multiaddr =
4088			iter::once(multiaddr::Protocol::P2p(peer1.into())).collect();
4089		let expected_addr2: sc_network::Multiaddr =
4090			iter::once(multiaddr::Protocol::P2p(peer2.into())).collect();
4091		assert!(added_addrs.contains(&expected_addr1), "peer1 must be in added set");
4092		assert!(added_addrs.contains(&expected_addr2), "peer2 must be in added set");
4093
4094		assert!(network.get_removed_reserved().is_empty());
4095	}
4096
4097	#[tokio::test]
4098	async fn sync_recovery_schedules_remove_for_one_connected_peer() {
4099		let network = TestNetwork::new();
4100		let notification_service = TestNotificationService::new();
4101		let (sync, _flag) = TestSync::with_syncing(false);
4102		let (queue_sender, _) = async_channel::bounded(2);
4103		let statement_store = TestStatementStore::new();
4104
4105		let connected_peer = PeerId::random();
4106
4107		let mut peers = HashMap::new();
4108		peers.insert(
4109			connected_peer,
4110			Peer {
4111				known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
4112				rate_limiter: PeerRateLimiter::new(
4113					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4114						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4115					NonZeroU32::new(
4116						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
4117					)
4118					.expect("burst capacity is nonzero"),
4119				),
4120				protocol_version: PeerProtocolVersion::V1,
4121				topic_affinity: None,
4122				is_light: false,
4123				pending_topic_affinity: None,
4124			},
4125		);
4126
4127		let mut handler = StatementHandler {
4128			protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
4129			notification_service: Box::new(notification_service),
4130			propagate_timeout: (Box::pin(futures::stream::pending())
4131				as Pin<Box<dyn Stream<Item = ()> + Send>>)
4132				.fuse(),
4133			pending_statements: FuturesUnordered::new(),
4134			pending_statements_peers: HashMap::new(),
4135			network: network.clone(),
4136			sync,
4137			sync_event_stream: (Box::pin(futures::stream::pending())
4138				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4139				.fuse(),
4140			peers,
4141			statement_store: Arc::new(statement_store),
4142			queue_sender,
4143			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4144				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4145			metrics: None,
4146			initial_sync_timeout: Box::pin(futures::future::pending()),
4147			pending_affinities_timeout: Box::pin(futures::future::pending()),
4148			pending_initial_syncs: HashMap::new(),
4149			initial_sync_peer_queue: VecDeque::new(),
4150			deferred_peers: HashSet::new(),
4151			dropped_statements_during_sync: true,
4152			sync_recovery_peer: None,
4153			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
4154		};
4155
4156		handler.start_sync_recovery();
4157
4158		// One remove call must have been issued for the connected peer
4159		{
4160			let removed = network.removed_reserved.lock().unwrap();
4161			assert_eq!(
4162				removed.len(),
4163				1,
4164				"Expected exactly one remove_peers_from_reserved_set call"
4165			);
4166			assert!(removed[0].contains(&connected_peer));
4167		}
4168
4169		// The recovery peer must be stored and the timeout future must be armed
4170		assert_eq!(handler.sync_recovery_peer, Some(connected_peer));
4171
4172		// Calling try_readd_sync_recovery_peer directly (as the select arm would after the future
4173		// resolves) must re-add the peer and clear the field
4174		handler.try_readd_sync_recovery_peer();
4175		assert!(handler.sync_recovery_peer.is_none());
4176		{
4177			let added = network.added_reserved.lock().unwrap();
4178			assert_eq!(added.len(), 1);
4179			let expected_addr: multiaddr::Multiaddr =
4180				iter::once(multiaddr::Protocol::P2p(connected_peer.into())).collect();
4181			assert!(added[0].contains(&expected_addr));
4182		}
4183
4184		// Re-entry guard: restore state to simulate a second sync-end while recovery is still
4185		// in flight (sync_recovery_peer is Some). The second call must not issue another remove.
4186		{
4187			let peer2 = PeerId::random();
4188			handler.sync_recovery_peer = Some(peer2);
4189			handler.start_sync_recovery();
4190			assert_eq!(
4191				handler.sync_recovery_peer,
4192				Some(peer2),
4193				"Re-entry guard: recovery peer must not change on second call"
4194			);
4195			assert_eq!(
4196				network.removed_reserved.lock().unwrap().len(),
4197				1,
4198				"Re-entry guard: no extra remove call while recovery is in flight"
4199			);
4200		}
4201	}
4202
4203	#[tokio::test]
4204	async fn sync_recovery_gated_by_dropped_statements_flag() {
4205		let make_peer = || Peer {
4206			known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
4207			rate_limiter: PeerRateLimiter::new(
4208				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4209					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4210				NonZeroU32::new(
4211					DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
4212				)
4213				.expect("burst capacity is nonzero"),
4214			),
4215			protocol_version: PeerProtocolVersion::V1,
4216			topic_affinity: None,
4217			is_light: false,
4218			pending_topic_affinity: None,
4219		};
4220
4221		let make_handler =
4222			|network: TestNetwork, dropped: bool| -> StatementHandler<TestNetwork, TestSync> {
4223				let (sync, _) = TestSync::with_syncing(false);
4224				let (queue_sender, _) = async_channel::bounded(2);
4225				let mut peers = HashMap::new();
4226				peers.insert(PeerId::random(), make_peer());
4227				StatementHandler {
4228					protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
4229					notification_service: Box::new(TestNotificationService::new()),
4230					propagate_timeout: (Box::pin(futures::stream::pending())
4231						as Pin<Box<dyn Stream<Item = ()> + Send>>)
4232						.fuse(),
4233					pending_statements: FuturesUnordered::new(),
4234					pending_statements_peers: HashMap::new(),
4235					network,
4236					sync,
4237					sync_event_stream: (Box::pin(futures::stream::pending())
4238						as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4239						.fuse(),
4240					peers,
4241					statement_store: Arc::new(TestStatementStore::new()),
4242					queue_sender,
4243					statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4244						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4245					metrics: None,
4246					initial_sync_timeout: Box::pin(futures::future::pending()),
4247					pending_affinities_timeout: Box::pin(futures::future::pending()),
4248					pending_initial_syncs: HashMap::new(),
4249					initial_sync_peer_queue: VecDeque::new(),
4250					deferred_peers: HashSet::new(),
4251					dropped_statements_during_sync: dropped,
4252					sync_recovery_peer: None,
4253					sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4254				}
4255			};
4256
4257		// flag=false → no recovery
4258		let net = TestNetwork::new();
4259		let mut handler = make_handler(net.clone(), false);
4260		handler.start_sync_recovery();
4261		assert!(handler.sync_recovery_peer.is_none());
4262		assert!(net.get_removed_reserved().is_empty());
4263
4264		// flag=true → recovery fires
4265		let net2 = TestNetwork::new();
4266		let mut handler2 = make_handler(net2.clone(), true);
4267		handler2.start_sync_recovery();
4268		assert!(handler2.sync_recovery_peer.is_some());
4269		assert_eq!(net2.get_removed_reserved().len(), 1);
4270	}
4271}