Skip to main content

sc_network_statement/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Statement handling to plug on top of the network service.
20//!
21//! Usage:
22//!
23//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
24//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
25//!   configuration as an extra peers set.
26//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
27//! `Future` that processes statements.
28
29use crate::config::*;
30
31use codec::{Compact, Decode, Encode, MaxEncodedLen};
32use futures::{
33	channel::oneshot,
34	future::{pending, FusedFuture},
35	prelude::*,
36	stream::FuturesUnordered,
37};
38use governor::{
39	clock::DefaultClock,
40	state::{InMemoryState, NotKeyed},
41	Quota, RateLimiter,
42};
43use prometheus_endpoint::{
44	exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
45	Registry, U64,
46};
47use rand::seq::IteratorRandom;
48use sc_network::{
49	config::{NonReservedPeerMode, SetConfig},
50	error, multiaddr,
51	peer_store::PeerStoreProvider,
52	service::{
53		traits::{NotificationEvent, NotificationService, ValidationResult},
54		NotificationMetrics,
55	},
56	types::ProtocolName,
57	utils::{interval, LruHashSet},
58	NetworkBackend, NetworkEventStream, NetworkPeers,
59};
60use sc_network_sync::{SyncEvent, SyncEventStream};
61use sc_network_types::PeerId;
62use sp_runtime::traits::Block as BlockT;
63use sp_statement_store::{
64	FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
65};
66use std::{
67	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
68	iter,
69	num::{NonZeroU32, NonZeroUsize},
70	pin::Pin,
71	sync::Arc,
72	time::Instant,
73};
74use tokio::time::timeout;
75pub mod config;
76
77/// A set of statements.
78pub type Statements = Vec<Statement>;
79/// Future resolving to statement import result.
80pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
81
82mod rep {
83	use sc_network::ReputationChange as Rep;
84	/// Reputation change when a peer sends us any statement.
85	///
86	/// This forces node to verify it, thus the negative value here. Once statement is verified,
87	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
88	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
89	/// Reputation change when a peer sends us any statement that is not invalid.
90	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
91	/// Reputation change when a peer sends us an statement that we didn't know about.
92	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
93	/// Reputation change when a peer sends us an invalid statement.
94	pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
95	/// Reputation change when a peer sends us a duplicate statement.
96	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
97	/// Reputation change when a peer floods us with statements.
98	pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
99}
100
101const LOG_TARGET: &str = "statement-gossip";
102/// Maximim time we wait for sending a notification to a peer.
103const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
104/// Interval for sending statement batches during initial sync to new peers.
105const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
106/// Delay before re-adding a peer to the reserved set after a forced disconnect for sync recovery.
107const SYNC_RECOVERY_READD_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
108
109struct Metrics {
110	propagated_statements: Counter<U64>,
111	known_statements_received: Counter<U64>,
112	skipped_oversized_statements: Counter<U64>,
113	propagated_statements_chunks: Histogram,
114	pending_statements: Gauge<U64>,
115	ignored_statements: Counter<U64>,
116	peers_connected: Gauge<U64>,
117	statements_received: Counter<U64>,
118	bytes_sent_total: Counter<U64>,
119	bytes_received_total: Counter<U64>,
120	sent_latency_seconds: Histogram,
121	initial_sync_statements_sent: Counter<U64>,
122	initial_sync_bursts_total: Counter<U64>,
123	initial_sync_peers_active: Gauge<U64>,
124	initial_sync_duration_seconds: Histogram,
125	statement_flooding_detected: Counter<U64>,
126}
127
128impl Metrics {
129	fn register(r: &Registry) -> Result<Self, PrometheusError> {
130		Ok(Self {
131			propagated_statements: register(
132				Counter::new(
133					"substrate_sync_propagated_statements",
134					"Number of statements propagated to at least one peer",
135				)?,
136				r,
137			)?,
138			known_statements_received: register(
139				Counter::new(
140					"substrate_sync_known_statement_received",
141					"Number of statements received via gossiping that were already in the statement store",
142				)?,
143				r,
144			)?,
145			skipped_oversized_statements: register(
146				Counter::new(
147					"substrate_sync_skipped_oversized_statements",
148					"Number of oversized statements that were skipped to be gossiped",
149				)?,
150				r,
151			)?,
152			propagated_statements_chunks: register(
153				Histogram::with_opts(
154					HistogramOpts::new(
155						"substrate_sync_propagated_statements_chunks",
156						"Distribution of chunk sizes when propagating statements",
157					)
158					.buckets(exponential_buckets(1.0, 2.0, 14)?),
159				)?,
160				r,
161			)?,
162			pending_statements: register(
163				Gauge::new(
164					"substrate_sync_pending_statement_validations",
165					"Number of pending statement validations",
166				)?,
167				r,
168			)?,
169			ignored_statements: register(
170				Counter::new(
171					"substrate_sync_ignored_statements",
172					"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
173				)?,
174				r,
175			)?,
176			peers_connected: register(
177				Gauge::new(
178					"substrate_sync_statement_peers_connected",
179					"Number of peers connected using the statement protocol",
180				)?,
181				r,
182			)?,
183			statements_received: register(
184				Counter::new(
185					"substrate_sync_statements_received",
186					"Total number of statements received from peers",
187				)?,
188				r,
189			)?,
190			bytes_sent_total: register(
191				Counter::new(
192					"substrate_sync_statement_bytes_sent_total",
193					"Total bytes sent for statement protocol messages",
194				)?,
195				r,
196			)?,
197			bytes_received_total: register(
198				Counter::new(
199					"substrate_sync_statement_bytes_received_total",
200					"Total bytes received for statement protocol messages",
201				)?,
202				r,
203			)?,
204			sent_latency_seconds: register(
205				Histogram::with_opts(
206					HistogramOpts::new(
207						"substrate_sync_statement_sent_latency_seconds",
208						"Time to send statement messages to peers",
209					)
210					// Buckets from 1μs to ~1s covering microsecond to millisecond range.
211					.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
212				)?,
213				r,
214			)?,
215			initial_sync_statements_sent: register(
216				Counter::new(
217					"substrate_sync_initial_sync_statements_sent",
218					"Total statements sent during initial sync bursts to newly connected peers",
219				)?,
220				r,
221			)?,
222			initial_sync_bursts_total: register(
223				Counter::new(
224					"substrate_sync_initial_sync_bursts_total",
225					"Total number of initial sync burst rounds processed",
226				)?,
227				r,
228			)?,
229			initial_sync_peers_active: register(
230				Gauge::new(
231					"substrate_sync_initial_sync_peers_active",
232					"Number of peers currently being synced via initial sync",
233				)?,
234				r,
235			)?,
236			initial_sync_duration_seconds: register(
237				Histogram::with_opts(
238					HistogramOpts::new(
239						"substrate_sync_initial_sync_duration_seconds",
240						"Per-peer total duration of initial sync from start to completion",
241					)
242					.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
243				)?,
244				r,
245			)?,
246			statement_flooding_detected: register(
247				Counter::new(
248					"substrate_sync_statement_flooding_detected",
249					"Number of peers disconnected for exceeding statement rate limits",
250				)?,
251				r,
252			)?,
253		})
254	}
255}
256
257/// Prototype for a [`StatementHandler`].
258pub struct StatementHandlerPrototype {
259	protocol_name: ProtocolName,
260	notification_service: Box<dyn NotificationService>,
261}
262
263impl StatementHandlerPrototype {
264	/// Create a new instance.
265	pub fn new<
266		Hash: AsRef<[u8]>,
267		Block: BlockT,
268		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
269	>(
270		genesis_hash: Hash,
271		fork_id: Option<&str>,
272		metrics: NotificationMetrics,
273		peer_store_handle: Arc<dyn PeerStoreProvider>,
274	) -> (Self, Net::NotificationProtocolConfig) {
275		let genesis_hash = genesis_hash.as_ref();
276		let protocol_name = if let Some(fork_id) = fork_id {
277			format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
278		} else {
279			format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
280		};
281		let (config, notification_service) = Net::notification_config(
282			protocol_name.clone().into(),
283			Vec::new(),
284			MAX_STATEMENT_NOTIFICATION_SIZE,
285			None,
286			SetConfig {
287				in_peers: 0,
288				out_peers: 0,
289				reserved_nodes: Vec::new(),
290				non_reserved_mode: NonReservedPeerMode::Deny,
291			},
292			metrics,
293			peer_store_handle,
294		);
295
296		(Self { protocol_name: protocol_name.into(), notification_service }, config)
297	}
298
299	/// Turns the prototype into the actual handler.
300	///
301	/// Important: the statements handler is initially disabled and doesn't gossip statements.
302	/// Gossiping is enabled when major syncing is done.
303	pub fn build<
304		N: NetworkPeers + NetworkEventStream,
305		S: SyncEventStream + sp_consensus::SyncOracle,
306	>(
307		self,
308		network: N,
309		sync: S,
310		statement_store: Arc<dyn StatementStore>,
311		metrics_registry: Option<&Registry>,
312		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
313		mut num_submission_workers: usize,
314		statements_per_second: u32,
315	) -> error::Result<StatementHandler<N, S>> {
316		let sync_event_stream = sync.event_stream("statement-handler-sync");
317		let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
318
319		if num_submission_workers == 0 {
320			log::warn!(
321				target: LOG_TARGET,
322				"num_submission_workers is 0, defaulting to 1"
323			);
324			num_submission_workers = 1;
325		}
326
327		let statements_per_second = match NonZeroU32::new(statements_per_second) {
328			Some(rate) => rate,
329			None => {
330				log::warn!(
331					target: LOG_TARGET,
332					"statements_per_second is 0, defaulting to {}",
333					DEFAULT_STATEMENTS_PER_SECOND
334				);
335				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
336					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
337			},
338		};
339
340		let metrics =
341			if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
342
343		for _ in 0..num_submission_workers {
344			let store = statement_store.clone();
345			let mut queue_receiver = queue_receiver.clone();
346			executor(
347				async move {
348					loop {
349						let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
350							queue_receiver.next().await;
351						match task {
352							None => return,
353							Some((statement, completion)) => {
354								let result = store.submit(statement, StatementSource::Network);
355								if completion.send(result).is_err() {
356									log::debug!(
357										target: LOG_TARGET,
358										"Error sending validation completion"
359									);
360								}
361							},
362						}
363					}
364				}
365				.boxed(),
366			);
367		}
368
369		let handler = StatementHandler {
370			protocol_name: self.protocol_name,
371			notification_service: self.notification_service,
372			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
373				as Pin<Box<dyn Stream<Item = ()> + Send>>)
374				.fuse(),
375			pending_statements: FuturesUnordered::new(),
376			pending_statements_peers: HashMap::new(),
377			network,
378			sync,
379			sync_event_stream: sync_event_stream.fuse(),
380			peers: HashMap::new(),
381			statement_store,
382			queue_sender,
383			statements_per_second,
384			metrics,
385			initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
386			pending_initial_syncs: HashMap::new(),
387			initial_sync_peer_queue: VecDeque::new(),
388			deferred_peers: HashSet::new(),
389			dropped_statements_during_sync: false,
390			sync_recovery_peer: None,
391			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
392		};
393
394		Ok(handler)
395	}
396}
397
398/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
399pub struct StatementHandler<
400	N: NetworkPeers + NetworkEventStream,
401	S: SyncEventStream + sp_consensus::SyncOracle,
402> {
403	protocol_name: ProtocolName,
404	/// Interval at which we call `propagate_statements`.
405	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
406	/// Pending statements verification tasks.
407	pending_statements:
408		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
409	/// As multiple peers can send us the same statement, we group
410	/// these peers using the statement hash while the statement is
411	/// imported. This prevents that we import the same statement
412	/// multiple times concurrently.
413	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
414	/// Network service to use to send messages and manage peers.
415	network: N,
416	/// Syncing service.
417	sync: S,
418	/// Receiver for syncing-related events.
419	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
420	/// Notification service.
421	notification_service: Box<dyn NotificationService>,
422	// All connected peers
423	peers: HashMap<PeerId, Peer>,
424	statement_store: Arc<dyn StatementStore>,
425	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
426	/// Maximum statements per second per peer.
427	statements_per_second: NonZeroU32,
428	/// Prometheus metrics.
429	metrics: Option<Metrics>,
430	/// Timeout for sending next statement batch during initial sync.
431	initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
432	/// Pending initial syncs per peer.
433	pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
434	/// Queue for round-robin processing of initial syncs.
435	initial_sync_peer_queue: VecDeque<PeerId>,
436	/// Tracks peers that connected while major sync was active and adds them to the reserved set
437	/// once sync ends
438	deferred_peers: HashSet<PeerId>,
439	/// Set to `true` when an incoming statement is dropped because `is_major_syncing()` is true
440	dropped_statements_during_sync: bool,
441	/// Peer scheduled for forced disconnect+reconnect to recover statements missed during sync
442	sync_recovery_peer: Option<PeerId>,
443	/// Fires when the `sync_recovery_peer` re-add delay has elapsed
444	sync_recovery_readd_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
445}
446
447/// Per-peer rate limiter using a token bucket algorithm.
448///
449/// The token bucket allows short bursts up to the per-second limit while enforcing
450/// the average rate over time.
451#[derive(Debug)]
452struct PeerRateLimiter {
453	limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
454}
455
456impl PeerRateLimiter {
457	fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
458		let quota = Quota::per_second(statements_per_second).allow_burst(burst);
459		Self { limiter: RateLimiter::direct(quota) }
460	}
461
462	/// Check if receiving `count` statements would exceed the rate limit.
463	fn is_flooding(&self, count: usize) -> bool {
464		if count > u32::MAX as usize {
465			return true;
466		}
467
468		let Some(n) = NonZeroU32::new(count as u32) else {
469			return false;
470		};
471		!matches!(self.limiter.check_n(n), Ok(Ok(())))
472	}
473}
474
475/// Peer information
476#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
477#[derive(Debug)]
478pub struct Peer {
479	/// Holds a set of statements known to this peer.
480	known_statements: LruHashSet<Hash>,
481	/// Rate limiter for statement flooding protection.
482	rate_limiter: PeerRateLimiter,
483}
484
485/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
486struct PendingInitialSync {
487	hashes: Vec<Hash>,
488	started_at: Instant,
489}
490
491/// Result of finding a sendable chunk of statements.
492enum ChunkResult {
493	/// Found a chunk that fits. Contains the end index (exclusive).
494	Send(usize),
495	/// First statement is oversized, skip it.
496	SkipOversized,
497}
498
499/// Result of sending a chunk of statements.
500enum SendChunkResult {
501	/// Successfully sent a chunk of N statements.
502	Sent(usize),
503	/// First statement was oversized and skipped.
504	Skipped,
505	/// Nothing to send.
506	Empty,
507	/// Send failed.
508	Failed,
509}
510
511/// Returns the maximum payload size for statement notifications.
512///
513/// This reserves space for encoding the length of the vector (Compact<u32>),
514/// ensuring the final encoded message fits within MAX_STATEMENT_NOTIFICATION_SIZE.
515fn max_statement_payload_size() -> usize {
516	MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
517}
518
519/// Find the largest chunk of statements starting from the beginning that fits
520/// within MAX_STATEMENT_NOTIFICATION_SIZE.
521///
522/// Uses an incremental approach: adds statements one by one until the limit is reached.
523/// This is efficient because we only compute sizes for statements we'll actually send
524/// in this chunk, rather than computing sizes for all statements upfront.
525fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
526	if statements.is_empty() {
527		return ChunkResult::Send(0);
528	}
529	let max_size = max_statement_payload_size();
530
531	// Incrementally add statements until we exceed the limit.
532	// This is efficient because we only compute sizes for statements in this chunk.
533	// accumulated_size is the sum of encoded sizes of all statements so far (without vec
534	// overhead).
535	let mut accumulated_size = 0;
536	let mut count = 0usize;
537
538	for stmt in &statements[0..] {
539		let stmt_size = stmt.encoded_size();
540		let new_count = count + 1;
541		// Compact encoding overhead for the new count
542		let new_total = accumulated_size + stmt_size;
543		if new_total > max_size {
544			break;
545		}
546
547		accumulated_size += stmt_size;
548		count = new_count;
549	}
550
551	// If we couldn't fit even a single statement, skip it.
552	if count == 0 {
553		ChunkResult::SkipOversized
554	} else {
555		ChunkResult::Send(count)
556	}
557}
558
559impl Peer {
560	/// Create a new peer for testing/benchmarking purposes.
561	#[cfg(any(test, feature = "test-helpers"))]
562	pub fn new_for_testing(
563		known_statements: LruHashSet<Hash>,
564		statements_per_second: NonZeroU32,
565		burst: NonZeroU32,
566	) -> Self {
567		Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
568	}
569}
570
571impl<N, S> StatementHandler<N, S>
572where
573	N: NetworkPeers + NetworkEventStream,
574	S: SyncEventStream + sp_consensus::SyncOracle,
575{
576	/// Create a new `StatementHandler` for testing/benchmarking purposes.
577	#[cfg(any(test, feature = "test-helpers"))]
578	pub fn new_for_testing(
579		protocol_name: ProtocolName,
580		notification_service: Box<dyn NotificationService>,
581		propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
582		network: N,
583		sync: S,
584		sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
585		peers: HashMap<PeerId, Peer>,
586		statement_store: Arc<dyn StatementStore>,
587		queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
588		statements_per_second: NonZeroU32,
589	) -> Self {
590		Self {
591			protocol_name,
592			notification_service,
593			propagate_timeout,
594			pending_statements: FuturesUnordered::new(),
595			pending_statements_peers: HashMap::new(),
596			network,
597			sync,
598			sync_event_stream,
599			peers,
600			statement_store,
601			queue_sender,
602			statements_per_second,
603			metrics: None,
604			initial_sync_timeout: Box::pin(pending().fuse()),
605			pending_initial_syncs: HashMap::new(),
606			initial_sync_peer_queue: VecDeque::new(),
607			deferred_peers: HashSet::new(),
608			dropped_statements_during_sync: false,
609			sync_recovery_peer: None,
610			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
611		}
612	}
613
614	/// Get mutable access to pending statements for testing/benchmarking.
615	#[cfg(any(test, feature = "test-helpers"))]
616	pub fn pending_statements_mut(
617		&mut self,
618	) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
619	{
620		&mut self.pending_statements
621	}
622
623	/// Turns the [`StatementHandler`] into a future that should run forever and not be
624	/// interrupted.
625	pub async fn run(mut self) {
626		loop {
627			futures::select_biased! {
628				_ = self.propagate_timeout.next() => {
629					self.propagate_statements().await;
630					self.metrics.as_ref().map(|metrics| {
631						metrics.pending_statements.set(self.pending_statements.len() as u64);
632					});
633				},
634				(hash, result) = self.pending_statements.select_next_some() => {
635					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
636						if let Some(result) = result {
637							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
638						}
639					} else {
640						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
641					}
642				},
643				sync_event = self.sync_event_stream.next() => {
644					if let Some(sync_event) = sync_event {
645						self.handle_sync_event(sync_event);
646					} else {
647						// Syncing has seemingly closed. Closing as well.
648						return;
649					}
650				}
651				event = self.notification_service.next_event().fuse() => {
652					if let Some(event) = event {
653						self.handle_notification_event(event).await
654					} else {
655						// `Notifications` has seemingly closed. Closing as well.
656						return
657					}
658				}
659				_ = &mut self.initial_sync_timeout => {
660					self.process_initial_sync_burst().await;
661					self.initial_sync_timeout =
662						Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
663				},
664				_ = &mut self.sync_recovery_readd_timeout => {
665					self.try_readd_sync_recovery_peer();
666					self.sync_recovery_readd_timeout = Box::pin(pending().fuse());
667				},
668			}
669
670			if !self.sync.is_major_syncing() {
671				self.drain_deferred_peers();
672				self.start_sync_recovery();
673			}
674		}
675	}
676
677	/// Send a single chunk of statements to a peer.
678	async fn send_statement_chunk(
679		&mut self,
680		peer: &PeerId,
681		statements: &[&Statement],
682	) -> SendChunkResult {
683		match find_sendable_chunk(statements) {
684			ChunkResult::Send(0) => SendChunkResult::Empty,
685			ChunkResult::Send(chunk_end) => {
686				let chunk = &statements[..chunk_end];
687				let encoded = chunk.encode();
688				let bytes_to_send = encoded.len() as u64;
689
690				let sent_latency_timer =
691					self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
692				let send_result = timeout(
693					SEND_TIMEOUT,
694					self.notification_service.send_async_notification(peer, encoded),
695				)
696				.await;
697				drop(sent_latency_timer);
698
699				if let Err(e) = send_result {
700					log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
701					return SendChunkResult::Failed;
702				}
703
704				log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
705				self.metrics.as_ref().map(|metrics| {
706					metrics.propagated_statements.inc_by(chunk.len() as u64);
707					metrics.bytes_sent_total.inc_by(bytes_to_send);
708					metrics.propagated_statements_chunks.observe(chunk.len() as f64);
709				});
710				SendChunkResult::Sent(chunk_end)
711			},
712			ChunkResult::SkipOversized => {
713				log::warn!(target: LOG_TARGET, "Statement too large, skipping");
714				self.metrics.as_ref().map(|metrics| {
715					metrics.skipped_oversized_statements.inc();
716				});
717				SendChunkResult::Skipped
718			},
719		}
720	}
721
722	/// Add all peers that were deferred during major sync to the reserved set
723	fn drain_deferred_peers(&mut self) {
724		if self.deferred_peers.is_empty() {
725			return;
726		}
727
728		log::debug!(
729			target: LOG_TARGET,
730			"Major sync complete, adding {} deferred statement peers",
731			self.deferred_peers.len(),
732		);
733
734		let addrs: HashSet<multiaddr::Multiaddr> = self
735			.deferred_peers
736			.drain()
737			.map(|p| {
738				iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
739			})
740			.collect();
741
742		if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
743		{
744			log::warn!(target: LOG_TARGET, "Failed to add deferred peers: {err}");
745		}
746	}
747
748	/// Pick one connected peer, remove it from the reserved set (forcing a disconnect), and
749	/// schedule it for re-adding after `SYNC_RECOVERY_READD_DELAY`. When the peer reconnects it
750	/// performs a fresh initial sync, delivering any statements that were dropped while the
751	/// `is_major_syncing` guard was active
752	fn start_sync_recovery(&mut self) {
753		if !self.dropped_statements_during_sync {
754			return;
755		}
756		self.dropped_statements_during_sync = false;
757
758		if self.sync_recovery_peer.is_some() {
759			return;
760		}
761
762		let Some(&peer_id) = self.peers.keys().choose(&mut rand::thread_rng()) else {
763			return;
764		};
765
766		log::trace!(
767			target: LOG_TARGET,
768			"Major sync complete, force-reconnecting {peer_id} for statement recovery",
769		);
770
771		if let Err(err) = self.network.remove_peers_from_reserved_set(
772			self.protocol_name.clone(),
773			iter::once(peer_id).collect(),
774		) {
775			log::warn!(target: LOG_TARGET, "Failed to remove peer {peer_id} for sync recovery: {err}");
776			return;
777		}
778
779		self.sync_recovery_peer = Some(peer_id);
780		self.sync_recovery_readd_timeout =
781			Box::pin(tokio::time::sleep(SYNC_RECOVERY_READD_DELAY).fuse());
782	}
783
784	/// Re-adds the sync-recovery peer to the reserved set after the backoff window has elapsed
785	fn try_readd_sync_recovery_peer(&mut self) {
786		let Some(peer_id) = self.sync_recovery_peer.take() else { return };
787		log::trace!(
788			target: LOG_TARGET,
789			"Re-adding {peer_id} to reserved set after sync recovery window",
790		);
791		let addr =
792			iter::once(multiaddr::Protocol::P2p(peer_id.into())).collect::<multiaddr::Multiaddr>();
793		if let Err(err) = self
794			.network
795			.add_peers_to_reserved_set(self.protocol_name.clone(), iter::once(addr).collect())
796		{
797			log::warn!(target: LOG_TARGET, "Failed to re-add sync recovery peer {peer_id}: {err}");
798		}
799	}
800
801	fn handle_sync_event(&mut self, event: SyncEvent) {
802		match event {
803			SyncEvent::PeerConnected(remote) => {
804				if self.sync.is_major_syncing() {
805					log::trace!(
806						target: LOG_TARGET,
807						"Major sync in progress, deferring connection to {remote}",
808					);
809					self.deferred_peers.insert(remote);
810					return;
811				}
812				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
813					.collect::<multiaddr::Multiaddr>();
814				let result = self.network.add_peers_to_reserved_set(
815					self.protocol_name.clone(),
816					iter::once(addr).collect(),
817				);
818				if let Err(err) = result {
819					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
820				}
821			},
822			SyncEvent::PeerDisconnected(remote) => {
823				if self.deferred_peers.remove(&remote) {
824					return;
825				}
826				let result = self.network.remove_peers_from_reserved_set(
827					self.protocol_name.clone(),
828					iter::once(remote).collect(),
829				);
830				if let Err(err) = result {
831					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
832				}
833			},
834		}
835	}
836
837	async fn handle_notification_event(&mut self, event: NotificationEvent) {
838		match event {
839			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
840				// Only accept peers whose role can be determined
841				let result = self
842					.network
843					.peer_role(peer, handshake)
844					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
845				let _ = result_tx.send(result);
846			},
847			NotificationEvent::NotificationStreamOpened { peer, .. } => {
848				let _was_in = self.peers.insert(
849					peer,
850					Peer {
851						known_statements: LruHashSet::new(
852							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
853						),
854						rate_limiter: PeerRateLimiter::new(
855							self.statements_per_second,
856							NonZeroU32::new(
857								self.statements_per_second.get() *
858									config::STATEMENTS_BURST_COEFFICIENT,
859							)
860							.expect("burst capacity is nonzero"),
861						),
862					},
863				);
864				debug_assert!(_was_in.is_none());
865
866				self.metrics.as_ref().map(|metrics| {
867					metrics.peers_connected.set(self.peers.len() as u64);
868				});
869
870				if !self.sync.is_major_syncing() {
871					let hashes = self.statement_store.statement_hashes();
872					if !hashes.is_empty() {
873						self.pending_initial_syncs.insert(
874							peer,
875							PendingInitialSync { hashes, started_at: Instant::now() },
876						);
877						self.initial_sync_peer_queue.push_back(peer);
878						self.metrics.as_ref().map(|metrics| {
879							metrics.initial_sync_peers_active.inc();
880						});
881					}
882				}
883			},
884			NotificationEvent::NotificationStreamClosed { peer } => {
885				let _peer = self.peers.remove(&peer);
886				debug_assert!(_peer.is_some());
887				if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
888					self.metrics.as_ref().map(|metrics| {
889						metrics.initial_sync_peers_active.dec();
890						metrics
891							.initial_sync_duration_seconds
892							.observe(pending.started_at.elapsed().as_secs_f64());
893					});
894				}
895				self.initial_sync_peer_queue.retain(|p| *p != peer);
896				self.metrics.as_ref().map(|metrics| {
897					metrics.peers_connected.set(self.peers.len() as u64);
898				});
899			},
900			NotificationEvent::NotificationReceived { peer, notification } => {
901				let bytes_received = notification.len() as u64;
902				self.metrics.as_ref().map(|metrics| {
903					metrics.bytes_received_total.inc_by(bytes_received);
904				});
905
906				// Accept statements only when node is not major syncing
907				if self.sync.is_major_syncing() {
908					log::trace!(
909						target: LOG_TARGET,
910						"{peer}: Ignoring statements while major syncing or offline"
911					);
912					self.dropped_statements_during_sync = true;
913					return;
914				}
915
916				if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
917					self.on_statements(peer, statements);
918				} else {
919					log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
920				}
921			},
922		}
923	}
924
925	/// Called when peer sends us new statements
926	#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
927	pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
928		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
929
930		self.metrics.as_ref().map(|metrics| {
931			metrics.statements_received.inc_by(statements.len() as u64);
932		});
933
934		if let Some(ref mut peer) = self.peers.get_mut(&who) {
935			if peer.rate_limiter.is_flooding(statements.len()) {
936				log::warn!(
937					target: LOG_TARGET,
938					"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
939					who,
940					self.statements_per_second
941				);
942
943				self.network.report_peer(who, rep::STATEMENT_FLOODING);
944				self.network.disconnect_peer(who, self.protocol_name.clone());
945				if let Some(ref metrics) = self.metrics {
946					metrics.statement_flooding_detected.inc();
947				}
948
949				// Clean up peer state immediately
950				self.peers.remove(&who);
951				self.pending_initial_syncs.remove(&who);
952				self.initial_sync_peer_queue.retain(|p| *p != who);
953
954				return;
955			}
956
957			let mut statements_left = statements.len() as u64;
958			for s in statements {
959				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
960					log::debug!(
961						target: LOG_TARGET,
962						"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
963						statements_left,
964						MAX_PENDING_STATEMENTS,
965					);
966					self.metrics.as_ref().map(|metrics| {
967						metrics.ignored_statements.inc_by(statements_left);
968					});
969					break;
970				}
971
972				let hash = s.hash();
973				peer.known_statements.insert(hash);
974
975				if self.statement_store.has_statement(&hash) {
976					self.metrics.as_ref().map(|metrics| {
977						metrics.known_statements_received.inc();
978					});
979
980					if let Some(peers) = self.pending_statements_peers.get(&hash) {
981						if peers.contains(&who) {
982							log::trace!(
983								target: LOG_TARGET,
984								"Already received the statement from the same peer {who}.",
985							);
986							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
987						}
988					}
989					continue;
990				}
991
992				self.network.report_peer(who, rep::ANY_STATEMENT);
993
994				match self.pending_statements_peers.entry(hash) {
995					Entry::Vacant(entry) => {
996						let (completion_sender, completion_receiver) = oneshot::channel();
997						match self.queue_sender.try_send((s, completion_sender)) {
998							Ok(()) => {
999								self.pending_statements.push(
1000									async move {
1001										let res = completion_receiver.await;
1002										(hash, res.ok())
1003									}
1004									.boxed(),
1005								);
1006								entry.insert(HashSet::from_iter([who]));
1007							},
1008							Err(async_channel::TrySendError::Full(_)) => {
1009								log::debug!(
1010									target: LOG_TARGET,
1011									"Dropped statement because validation channel is full",
1012								);
1013							},
1014							Err(async_channel::TrySendError::Closed(_)) => {
1015								log::trace!(
1016									target: LOG_TARGET,
1017									"Dropped statement because validation channel is closed",
1018								);
1019							},
1020						}
1021					},
1022					Entry::Occupied(mut entry) => {
1023						if !entry.get_mut().insert(who) {
1024							// Already received this from the same peer.
1025							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1026						}
1027					},
1028				}
1029
1030				statements_left -= 1;
1031			}
1032		}
1033	}
1034
1035	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
1036		match import {
1037			SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
1038			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
1039			SubmitResult::KnownExpired => {},
1040			SubmitResult::Rejected(_) => {},
1041			SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
1042			SubmitResult::InternalError(_) => {},
1043		}
1044	}
1045
1046	/// Propagate one statement.
1047	pub async fn propagate_statement(&mut self, hash: &Hash) {
1048		// Accept statements only when node is not major syncing
1049		if self.sync.is_major_syncing() {
1050			return;
1051		}
1052
1053		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
1054		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
1055			self.do_propagate_statements(&[(*hash, statement)]).await;
1056		}
1057	}
1058
1059	/// Propagate the given `statements` to the given `peer`.
1060	///
1061	/// Internally filters `statements` to only send unknown statements to the peer.
1062	async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
1063		let Some(peer) = self.peers.get_mut(who) else {
1064			return;
1065		};
1066
1067		let to_send: Vec<_> = statements
1068			.iter()
1069			.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
1070			.collect();
1071
1072		log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
1073
1074		if to_send.is_empty() {
1075			return;
1076		}
1077
1078		self.send_statements_in_chunks(who, &to_send).await;
1079	}
1080
1081	/// Send statements to a peer in chunks, respecting the maximum notification size.
1082	async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
1083		let mut offset = 0;
1084		while offset < statements.len() {
1085			match self.send_statement_chunk(who, &statements[offset..]).await {
1086				SendChunkResult::Sent(chunk_end) => {
1087					offset += chunk_end;
1088				},
1089				SendChunkResult::Skipped => {
1090					offset += 1;
1091				},
1092				SendChunkResult::Empty | SendChunkResult::Failed => return,
1093			}
1094		}
1095	}
1096
1097	async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
1098		log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
1099		let peers: Vec<_> = self.peers.keys().copied().collect();
1100		for who in peers {
1101			log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
1102			self.send_statements_to_peer(&who, statements).await;
1103		}
1104		log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
1105	}
1106
1107	/// Call when we must propagate ready statements to peers.
1108	async fn propagate_statements(&mut self) {
1109		// Send out statements only when node is not major syncing
1110		if self.sync.is_major_syncing() {
1111			return;
1112		}
1113
1114		let Ok(statements) = self.statement_store.take_recent_statements() else { return };
1115		if !statements.is_empty() {
1116			self.do_propagate_statements(&statements).await;
1117		}
1118	}
1119
1120	/// Record initial sync completion metrics for a peer being removed.
1121	fn record_initial_sync_completion(&self, started_at: Instant) {
1122		self.metrics.as_ref().map(|metrics| {
1123			metrics.initial_sync_peers_active.dec();
1124			metrics
1125				.initial_sync_duration_seconds
1126				.observe(started_at.elapsed().as_secs_f64());
1127		});
1128	}
1129
1130	/// Process one batch of initial sync for the next peer in the queue (round-robin).
1131	async fn process_initial_sync_burst(&mut self) {
1132		if self.sync.is_major_syncing() {
1133			return;
1134		}
1135
1136		let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1137			return;
1138		};
1139
1140		let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1141			return;
1142		};
1143
1144		self.metrics.as_ref().map(|metrics| {
1145			metrics.initial_sync_bursts_total.inc();
1146		});
1147
1148		if entry.get().hashes.is_empty() {
1149			let started_at = entry.get().started_at;
1150			entry.remove();
1151			self.record_initial_sync_completion(started_at);
1152			return;
1153		}
1154
1155		// Fetch statements up to max_statement_payload_size (reserves space for vec encoding)
1156		let max_size = max_statement_payload_size();
1157		let mut accumulated_size = 0;
1158		let (statements, processed) = match self.statement_store.statements_by_hashes(
1159			&entry.get().hashes,
1160			&mut |_hash, encoded, _stmt| {
1161				if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1162					return FilterDecision::Abort;
1163				}
1164				accumulated_size += encoded.len();
1165				FilterDecision::Take
1166			},
1167		) {
1168			Ok(r) => r,
1169			Err(e) => {
1170				log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1171				let started_at = entry.get().started_at;
1172				entry.remove();
1173				self.record_initial_sync_completion(started_at);
1174				return;
1175			},
1176		};
1177
1178		// Drain processed hashes and check if more remain
1179		entry.get_mut().hashes.drain(..processed);
1180		let has_more = !entry.get().hashes.is_empty();
1181		drop(entry);
1182
1183		// Send statements (already sized to fit in one message)
1184		let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1185		match self.send_statement_chunk(&peer_id, &to_send).await {
1186			SendChunkResult::Failed => {
1187				if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1188					self.record_initial_sync_completion(pending.started_at);
1189				}
1190				return;
1191			},
1192			SendChunkResult::Sent(sent) => {
1193				debug_assert_eq!(to_send.len(), sent);
1194				self.metrics.as_ref().map(|metrics| {
1195					metrics.initial_sync_statements_sent.inc_by(sent as u64);
1196				});
1197				// Mark statements as known
1198				if let Some(peer) = self.peers.get_mut(&peer_id) {
1199					for (hash, _) in &statements {
1200						peer.known_statements.insert(*hash);
1201					}
1202				}
1203			},
1204			SendChunkResult::Empty | SendChunkResult::Skipped => {},
1205		}
1206
1207		// Re-queue if more hashes remain
1208		if has_more {
1209			self.initial_sync_peer_queue.push_back(peer_id);
1210		} else {
1211			if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1212				self.record_initial_sync_completion(pending.started_at);
1213			}
1214		}
1215	}
1216}
1217
1218#[cfg(test)]
1219mod tests {
1220
1221	use super::*;
1222	use std::sync::{
1223		atomic::{AtomicBool, Ordering},
1224		Mutex,
1225	};
1226
1227	#[derive(Clone)]
1228	struct TestNetwork {
1229		reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1230		disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1231		added_reserved: Arc<Mutex<Vec<HashSet<sc_network::Multiaddr>>>>,
1232		removed_reserved: Arc<Mutex<Vec<Vec<PeerId>>>>,
1233	}
1234
1235	impl TestNetwork {
1236		fn new() -> Self {
1237			Self {
1238				reported_peers: Arc::new(Mutex::new(Vec::new())),
1239				disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1240				added_reserved: Arc::new(Mutex::new(Vec::new())),
1241				removed_reserved: Arc::new(Mutex::new(Vec::new())),
1242			}
1243		}
1244
1245		fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1246			self.reported_peers.lock().unwrap().clone()
1247		}
1248
1249		fn get_disconnected_peers(&self) -> Vec<PeerId> {
1250			self.disconnected_peers.lock().unwrap().clone()
1251		}
1252
1253		fn get_added_reserved(&self) -> Vec<HashSet<sc_network::Multiaddr>> {
1254			self.added_reserved.lock().unwrap().clone()
1255		}
1256
1257		fn get_removed_reserved(&self) -> Vec<Vec<PeerId>> {
1258			self.removed_reserved.lock().unwrap().clone()
1259		}
1260	}
1261
1262	#[async_trait::async_trait]
1263	impl NetworkPeers for TestNetwork {
1264		fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1265			unimplemented!()
1266		}
1267
1268		fn set_authorized_only(&self, _: bool) {
1269			unimplemented!()
1270		}
1271
1272		fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1273			unimplemented!()
1274		}
1275
1276		fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1277			self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1278		}
1279
1280		fn peer_reputation(&self, _: &PeerId) -> i32 {
1281			unimplemented!()
1282		}
1283
1284		fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1285			self.disconnected_peers.lock().unwrap().push(peer);
1286		}
1287
1288		fn accept_unreserved_peers(&self) {
1289			unimplemented!()
1290		}
1291
1292		fn deny_unreserved_peers(&self) {
1293			unimplemented!()
1294		}
1295
1296		fn add_reserved_peer(
1297			&self,
1298			_: sc_network::config::MultiaddrWithPeerId,
1299		) -> Result<(), String> {
1300			unimplemented!()
1301		}
1302
1303		fn remove_reserved_peer(&self, _: PeerId) {
1304			unimplemented!()
1305		}
1306
1307		fn set_reserved_peers(
1308			&self,
1309			_: sc_network::ProtocolName,
1310			_: std::collections::HashSet<sc_network::Multiaddr>,
1311		) -> Result<(), String> {
1312			unimplemented!()
1313		}
1314
1315		fn add_peers_to_reserved_set(
1316			&self,
1317			_: sc_network::ProtocolName,
1318			addrs: std::collections::HashSet<sc_network::Multiaddr>,
1319		) -> Result<(), String> {
1320			self.added_reserved.lock().unwrap().push(addrs);
1321			Ok(())
1322		}
1323
1324		fn remove_peers_from_reserved_set(
1325			&self,
1326			_: sc_network::ProtocolName,
1327			peers: Vec<PeerId>,
1328		) -> Result<(), String> {
1329			self.removed_reserved.lock().unwrap().push(peers);
1330			Ok(())
1331		}
1332
1333		fn sync_num_connected(&self) -> usize {
1334			unimplemented!()
1335		}
1336
1337		fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1338			unimplemented!()
1339		}
1340
1341		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1342			unimplemented!();
1343		}
1344	}
1345
1346	#[derive(Clone)]
1347	struct TestSync {
1348		major_syncing: Arc<AtomicBool>,
1349	}
1350
1351	impl TestSync {
1352		fn new() -> Self {
1353			Self { major_syncing: Arc::new(AtomicBool::new(false)) }
1354		}
1355
1356		fn with_syncing(initial: bool) -> (Self, Arc<AtomicBool>) {
1357			let flag = Arc::new(AtomicBool::new(initial));
1358			(Self { major_syncing: flag.clone() }, flag)
1359		}
1360	}
1361
1362	impl SyncEventStream for TestSync {
1363		fn event_stream(
1364			&self,
1365			_name: &'static str,
1366		) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1367			Box::pin(futures::stream::pending())
1368		}
1369	}
1370
1371	impl sp_consensus::SyncOracle for TestSync {
1372		fn is_major_syncing(&self) -> bool {
1373			self.major_syncing.load(Ordering::Relaxed)
1374		}
1375
1376		fn is_offline(&self) -> bool {
1377			unimplemented!()
1378		}
1379	}
1380
1381	impl NetworkEventStream for TestNetwork {
1382		fn event_stream(
1383			&self,
1384			_name: &'static str,
1385		) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1386			unimplemented!()
1387		}
1388	}
1389
1390	#[derive(Debug, Clone)]
1391	struct TestNotificationService {
1392		sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1393	}
1394
1395	impl TestNotificationService {
1396		fn new() -> Self {
1397			Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1398		}
1399
1400		fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1401			self.sent_notifications.lock().unwrap().clone()
1402		}
1403	}
1404
1405	#[async_trait::async_trait]
1406	impl NotificationService for TestNotificationService {
1407		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1408			unimplemented!()
1409		}
1410
1411		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1412			unimplemented!()
1413		}
1414
1415		fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1416			self.sent_notifications.lock().unwrap().push((*peer, notification));
1417		}
1418
1419		async fn send_async_notification(
1420			&mut self,
1421			peer: &PeerId,
1422			notification: Vec<u8>,
1423		) -> Result<(), sc_network::error::Error> {
1424			self.sent_notifications.lock().unwrap().push((*peer, notification));
1425			Ok(())
1426		}
1427
1428		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1429			unimplemented!()
1430		}
1431
1432		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1433			unimplemented!()
1434		}
1435
1436		async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1437			None
1438		}
1439
1440		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1441			unimplemented!()
1442		}
1443
1444		fn protocol(&self) -> &sc_network::types::ProtocolName {
1445			unimplemented!()
1446		}
1447
1448		fn message_sink(
1449			&self,
1450			_peer: &PeerId,
1451		) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1452			unimplemented!()
1453		}
1454	}
1455
1456	#[derive(Clone)]
1457	struct TestStatementStore {
1458		statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1459		recent_statements:
1460			Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1461	}
1462
1463	impl TestStatementStore {
1464		fn new() -> Self {
1465			Self { statements: Default::default(), recent_statements: Default::default() }
1466		}
1467	}
1468
1469	impl StatementStore for TestStatementStore {
1470		fn statements(
1471			&self,
1472		) -> sp_statement_store::Result<
1473			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1474		> {
1475			Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1476		}
1477
1478		fn take_recent_statements(
1479			&self,
1480		) -> sp_statement_store::Result<
1481			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1482		> {
1483			Ok(self.recent_statements.lock().unwrap().drain().collect())
1484		}
1485
1486		fn statement(
1487			&self,
1488			_hash: &sp_statement_store::Hash,
1489		) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1490			unimplemented!()
1491		}
1492
1493		fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1494			self.statements.lock().unwrap().contains_key(hash)
1495		}
1496
1497		fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1498			self.statements.lock().unwrap().keys().cloned().collect()
1499		}
1500
1501		fn statements_by_hashes(
1502			&self,
1503			hashes: &[sp_statement_store::Hash],
1504			filter: &mut dyn FnMut(
1505				&sp_statement_store::Hash,
1506				&[u8],
1507				&sp_statement_store::Statement,
1508			) -> FilterDecision,
1509		) -> sp_statement_store::Result<(
1510			Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1511			usize,
1512		)> {
1513			let statements = self.statements.lock().unwrap();
1514			let mut result = Vec::new();
1515			let mut processed = 0;
1516			for hash in hashes {
1517				let Some(stmt) = statements.get(hash) else {
1518					processed += 1;
1519					continue;
1520				};
1521				let encoded = stmt.encode();
1522				match filter(hash, &encoded, stmt) {
1523					FilterDecision::Skip => {
1524						processed += 1;
1525					},
1526					FilterDecision::Take => {
1527						processed += 1;
1528						result.push((*hash, stmt.clone()));
1529					},
1530					FilterDecision::Abort => break,
1531				}
1532			}
1533			Ok((result, processed))
1534		}
1535
1536		fn broadcasts(
1537			&self,
1538			_match_all_topics: &[sp_statement_store::Topic],
1539		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1540			unimplemented!()
1541		}
1542
1543		fn posted(
1544			&self,
1545			_match_all_topics: &[sp_statement_store::Topic],
1546			_dest: [u8; 32],
1547		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1548			unimplemented!()
1549		}
1550
1551		fn posted_clear(
1552			&self,
1553			_match_all_topics: &[sp_statement_store::Topic],
1554			_dest: [u8; 32],
1555		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1556			unimplemented!()
1557		}
1558
1559		fn broadcasts_stmt(
1560			&self,
1561			_match_all_topics: &[sp_statement_store::Topic],
1562		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1563			unimplemented!()
1564		}
1565
1566		fn posted_stmt(
1567			&self,
1568			_match_all_topics: &[sp_statement_store::Topic],
1569			_dest: [u8; 32],
1570		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1571			unimplemented!()
1572		}
1573
1574		fn posted_clear_stmt(
1575			&self,
1576			_match_all_topics: &[sp_statement_store::Topic],
1577			_dest: [u8; 32],
1578		) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1579			unimplemented!()
1580		}
1581
1582		fn submit(
1583			&self,
1584			_statement: sp_statement_store::Statement,
1585			_source: sp_statement_store::StatementSource,
1586		) -> sp_statement_store::SubmitResult {
1587			unimplemented!()
1588		}
1589
1590		fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1591			unimplemented!()
1592		}
1593
1594		fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1595			unimplemented!()
1596		}
1597	}
1598
1599	fn build_handler() -> (
1600		StatementHandler<TestNetwork, TestSync>,
1601		TestStatementStore,
1602		TestNetwork,
1603		TestNotificationService,
1604		async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1605	) {
1606		let statement_store = TestStatementStore::new();
1607		let (queue_sender, queue_receiver) = async_channel::bounded(2);
1608		let network = TestNetwork::new();
1609		let notification_service = TestNotificationService::new();
1610		let peer_id = PeerId::random();
1611		let mut peers = HashMap::new();
1612		peers.insert(
1613			peer_id,
1614			Peer {
1615				known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1616				rate_limiter: PeerRateLimiter::new(
1617					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1618						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1619					NonZeroU32::new(
1620						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1621					)
1622					.expect("burst capacity is nonzero"),
1623				),
1624			},
1625		);
1626
1627		let handler = StatementHandler {
1628			protocol_name: "/statement/1".into(),
1629			notification_service: Box::new(notification_service.clone()),
1630			propagate_timeout: (Box::pin(futures::stream::pending())
1631				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1632				.fuse(),
1633			pending_statements: FuturesUnordered::new(),
1634			pending_statements_peers: HashMap::new(),
1635			network: network.clone(),
1636			sync: TestSync::new(),
1637			sync_event_stream: (Box::pin(futures::stream::pending())
1638				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1639				.fuse(),
1640			peers,
1641			statement_store: Arc::new(statement_store.clone()),
1642			queue_sender,
1643			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1644				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1645			metrics: None,
1646			initial_sync_timeout: Box::pin(futures::future::pending()),
1647			pending_initial_syncs: HashMap::new(),
1648			initial_sync_peer_queue: VecDeque::new(),
1649			deferred_peers: HashSet::new(),
1650			dropped_statements_during_sync: false,
1651			sync_recovery_peer: None,
1652			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1653		};
1654		(handler, statement_store, network, notification_service, queue_receiver)
1655	}
1656
1657	#[tokio::test]
1658	async fn test_skips_processing_statements_that_already_in_store() {
1659		let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1660			build_handler();
1661
1662		let mut statement1 = Statement::new();
1663		statement1.set_plain_data(b"statement1".to_vec());
1664		let hash1 = statement1.hash();
1665
1666		statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1667
1668		let mut statement2 = Statement::new();
1669		statement2.set_plain_data(b"statement2".to_vec());
1670		let hash2 = statement2.hash();
1671
1672		let peer_id = *handler.peers.keys().next().unwrap();
1673
1674		handler.on_statements(peer_id, vec![statement1, statement2]);
1675
1676		let to_submit = queue_receiver.try_recv();
1677		assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1678
1679		let no_more = queue_receiver.try_recv();
1680		assert!(no_more.is_err(), "Expected only one statement to be queued");
1681	}
1682
1683	#[tokio::test]
1684	async fn test_reports_for_duplicate_statements() {
1685		let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1686			build_handler();
1687
1688		let peer_id = *handler.peers.keys().next().unwrap();
1689
1690		let mut statement1 = Statement::new();
1691		statement1.set_plain_data(b"statement1".to_vec());
1692
1693		handler.on_statements(peer_id, vec![statement1.clone()]);
1694		{
1695			// Manually process statements submission
1696			let (s, _) = queue_receiver.try_recv().unwrap();
1697			let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1698			handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1699		}
1700
1701		handler.on_statements(peer_id, vec![statement1]);
1702
1703		let reports = network.get_reports();
1704		assert_eq!(
1705			reports,
1706			vec![
1707				(peer_id, rep::ANY_STATEMENT),        // Report for first statement
1708				(peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement
1709				(peer_id, rep::DUPLICATE_STATEMENT)   // Report for duplicate statement
1710			],
1711			"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1712			reports
1713		);
1714	}
1715
1716	#[tokio::test]
1717	async fn test_splits_large_batches_into_smaller_chunks() {
1718		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1719			build_handler();
1720
1721		let num_statements = 30;
1722		let statement_size = 100 * 1024; // 100KB per statement
1723		for i in 0..num_statements {
1724			let mut statement = Statement::new();
1725			let mut data = vec![0u8; statement_size];
1726			data[0] = i as u8;
1727			statement.set_plain_data(data);
1728			let hash = statement.hash();
1729			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1730		}
1731
1732		handler.propagate_statements().await;
1733
1734		let sent = notification_service.get_sent_notifications();
1735		let mut total_statements_sent = 0;
1736		assert!(
1737			sent.len() == 3,
1738			"Expected batch to be split into 3 chunks, but got {} chunks",
1739			sent.len()
1740		);
1741		for (_peer, notification) in sent.iter() {
1742			assert!(
1743				notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1744				"Notification size {} exceeds limit {}",
1745				notification.len(),
1746				MAX_STATEMENT_NOTIFICATION_SIZE
1747			);
1748			if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1749				total_statements_sent += stmts.len();
1750			}
1751		}
1752
1753		assert_eq!(
1754			total_statements_sent, num_statements,
1755			"Expected all {} statements to be sent, but only {} were sent",
1756			num_statements, total_statements_sent
1757		);
1758	}
1759
1760	#[tokio::test]
1761	async fn test_skips_only_oversized_statements() {
1762		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1763			build_handler();
1764
1765		let mut statement1 = Statement::new();
1766		statement1.set_plain_data(vec![1u8; 100]);
1767		let hash1 = statement1.hash();
1768		statement_store
1769			.recent_statements
1770			.lock()
1771			.unwrap()
1772			.insert(hash1, statement1.clone());
1773
1774		let mut oversized1 = Statement::new();
1775		oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1776		let hash_oversized1 = oversized1.hash();
1777		statement_store
1778			.recent_statements
1779			.lock()
1780			.unwrap()
1781			.insert(hash_oversized1, oversized1);
1782
1783		let mut statement2 = Statement::new();
1784		statement2.set_plain_data(vec![3u8; 100]);
1785		let hash2 = statement2.hash();
1786		statement_store
1787			.recent_statements
1788			.lock()
1789			.unwrap()
1790			.insert(hash2, statement2.clone());
1791
1792		let mut oversized2 = Statement::new();
1793		oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1794		let hash_oversized2 = oversized2.hash();
1795		statement_store
1796			.recent_statements
1797			.lock()
1798			.unwrap()
1799			.insert(hash_oversized2, oversized2);
1800
1801		let mut statement3 = Statement::new();
1802		statement3.set_plain_data(vec![5u8; 100]);
1803		let hash3 = statement3.hash();
1804		statement_store
1805			.recent_statements
1806			.lock()
1807			.unwrap()
1808			.insert(hash3, statement3.clone());
1809
1810		handler.propagate_statements().await;
1811
1812		let sent = notification_service.get_sent_notifications();
1813
1814		let mut sent_hashes = sent
1815			.iter()
1816			.flat_map(|(_peer, notification)| {
1817				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1818			})
1819			.map(|s| s.hash())
1820			.collect::<Vec<_>>();
1821		sent_hashes.sort();
1822		let mut expected_hashes = vec![hash1, hash2, hash3];
1823		expected_hashes.sort();
1824		assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1825	}
1826
1827	fn build_handler_no_peers() -> (
1828		StatementHandler<TestNetwork, TestSync>,
1829		TestStatementStore,
1830		TestNetwork,
1831		TestNotificationService,
1832	) {
1833		let statement_store = TestStatementStore::new();
1834		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1835		let network = TestNetwork::new();
1836		let notification_service = TestNotificationService::new();
1837
1838		let handler = StatementHandler {
1839			protocol_name: "/statement/1".into(),
1840			notification_service: Box::new(notification_service.clone()),
1841			propagate_timeout: (Box::pin(futures::stream::pending())
1842				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1843				.fuse(),
1844			pending_statements: FuturesUnordered::new(),
1845			pending_statements_peers: HashMap::new(),
1846			network: network.clone(),
1847			sync: TestSync::new(),
1848			sync_event_stream: (Box::pin(futures::stream::pending())
1849				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1850				.fuse(),
1851			peers: HashMap::new(),
1852			statement_store: Arc::new(statement_store.clone()),
1853			queue_sender,
1854			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1855				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1856			metrics: None,
1857			initial_sync_timeout: Box::pin(futures::future::pending()),
1858			pending_initial_syncs: HashMap::new(),
1859			initial_sync_peer_queue: VecDeque::new(),
1860			deferred_peers: HashSet::new(),
1861			dropped_statements_during_sync: false,
1862			sync_recovery_peer: None,
1863			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1864		};
1865		(handler, statement_store, network, notification_service)
1866	}
1867
1868	#[tokio::test]
1869	async fn test_initial_sync_burst_single_peer() {
1870		let (mut handler, statement_store, _network, notification_service) =
1871			build_handler_no_peers();
1872
1873		// Create 20MB of statements (200 statements x 100KB each)
1874		// Using 100KB ensures ~10 statements per 1MB batch, requiring ~20 bursts
1875		let num_statements = 200;
1876		let statement_size = 100 * 1024; // 100KB per statement
1877		let mut expected_hashes = Vec::new();
1878		for i in 0..num_statements {
1879			let mut statement = Statement::new();
1880			let mut data = vec![0u8; statement_size];
1881			// Use multiple bytes for uniqueness since we have >255 statements
1882			data[0] = (i % 256) as u8;
1883			data[1] = (i / 256) as u8;
1884			statement.set_plain_data(data);
1885			let hash = statement.hash();
1886			expected_hashes.push(hash);
1887			statement_store.statements.lock().unwrap().insert(hash, statement);
1888		}
1889
1890		// Setup peer and simulate connection
1891		let peer_id = PeerId::random();
1892
1893		handler
1894			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1895				peer: peer_id,
1896				direction: sc_network::service::traits::Direction::Inbound,
1897				handshake: vec![],
1898				negotiated_fallback: None,
1899			})
1900			.await;
1901
1902		// Verify peer was added and initial sync was queued
1903		assert!(handler.peers.contains_key(&peer_id));
1904		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1905		assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1906
1907		// Process bursts until all statements are sent
1908		let mut burst_count = 0;
1909		while handler.pending_initial_syncs.contains_key(&peer_id) {
1910			handler.process_initial_sync_burst().await;
1911			burst_count += 1;
1912			// Safety limit
1913			assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1914		}
1915
1916		// Verify multiple bursts were needed
1917		// With 200 statements x 100KB each and ~1MB per batch, we expect many bursts
1918		assert!(
1919			burst_count >= 10,
1920			"Expected multiple bursts for 200 statements of 100KB each, got {}",
1921			burst_count
1922		);
1923
1924		// Verify all statements were sent
1925		let sent = notification_service.get_sent_notifications();
1926		let mut sent_hashes: Vec<_> = sent
1927			.iter()
1928			.flat_map(|(peer, notification)| {
1929				assert_eq!(*peer, peer_id);
1930				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1931			})
1932			.map(|s| s.hash())
1933			.collect();
1934		sent_hashes.sort();
1935		expected_hashes.sort();
1936
1937		assert_eq!(
1938			sent_hashes.len(),
1939			expected_hashes.len(),
1940			"Expected {} statements to be sent, got {}",
1941			expected_hashes.len(),
1942			sent_hashes.len()
1943		);
1944		assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1945
1946		// Verify cleanup
1947		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1948		assert!(handler.initial_sync_peer_queue.is_empty());
1949	}
1950
1951	#[tokio::test]
1952	async fn test_initial_sync_burst_multiple_peers_round_robin() {
1953		let (mut handler, statement_store, _network, notification_service) =
1954			build_handler_no_peers();
1955
1956		// Create 20MB of statements (200 statements x 100KB each)
1957		let num_statements = 200;
1958		let statement_size = 100 * 1024; // 100KB per statement
1959		let mut expected_hashes = Vec::new();
1960		for i in 0..num_statements {
1961			let mut statement = Statement::new();
1962			let mut data = vec![0u8; statement_size];
1963			data[0] = (i % 256) as u8;
1964			data[1] = (i / 256) as u8;
1965			statement.set_plain_data(data);
1966			let hash = statement.hash();
1967			expected_hashes.push(hash);
1968			statement_store.statements.lock().unwrap().insert(hash, statement);
1969		}
1970
1971		// Setup 3 peers and simulate connections
1972		let peer1 = PeerId::random();
1973		let peer2 = PeerId::random();
1974		let peer3 = PeerId::random();
1975
1976		// Connect peers
1977		for peer in [peer1, peer2, peer3] {
1978			handler
1979				.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1980					peer,
1981					direction: sc_network::service::traits::Direction::Inbound,
1982					handshake: vec![],
1983					negotiated_fallback: None,
1984				})
1985				.await;
1986		}
1987
1988		// Verify all peers were added and initial syncs were queued
1989		assert_eq!(handler.peers.len(), 3);
1990		assert_eq!(handler.pending_initial_syncs.len(), 3);
1991		assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1992
1993		// Track which peer was processed on each burst for round-robin verification
1994		let mut peer_burst_order = Vec::new();
1995		let mut burst_count = 0;
1996
1997		while !handler.pending_initial_syncs.is_empty() {
1998			// Record which peer will be processed next
1999			if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
2000				peer_burst_order.push(next_peer);
2001			}
2002			handler.process_initial_sync_burst().await;
2003			burst_count += 1;
2004			// Safety limit
2005			assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
2006		}
2007
2008		// Verify multiple bursts were needed
2009		// With 3 peers and many bursts per peer, we expect many bursts total
2010		assert!(
2011			burst_count >= 30,
2012			"Expected many bursts for 3 peers with 200 statements each, got {}",
2013			burst_count
2014		);
2015
2016		// Verify round-robin pattern in first 9 bursts (3 peers x 3 rounds)
2017		assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
2018		// First round
2019		assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
2020		assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
2021		assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
2022		// Second round
2023		assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
2024		assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
2025		assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
2026
2027		// Verify all peers received all statements
2028		let sent = notification_service.get_sent_notifications();
2029		let mut peer1_hashes: Vec<_> = sent
2030			.iter()
2031			.filter(|(peer, _)| *peer == peer1)
2032			.flat_map(|(_, notification)| {
2033				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2034			})
2035			.map(|s| s.hash())
2036			.collect();
2037		let mut peer2_hashes: Vec<_> = sent
2038			.iter()
2039			.filter(|(peer, _)| *peer == peer2)
2040			.flat_map(|(_, notification)| {
2041				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2042			})
2043			.map(|s| s.hash())
2044			.collect();
2045		let mut peer3_hashes: Vec<_> = sent
2046			.iter()
2047			.filter(|(peer, _)| *peer == peer3)
2048			.flat_map(|(_, notification)| {
2049				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2050			})
2051			.map(|s| s.hash())
2052			.collect();
2053
2054		peer1_hashes.sort();
2055		peer2_hashes.sort();
2056		peer3_hashes.sort();
2057		expected_hashes.sort();
2058
2059		assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
2060		assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
2061		assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
2062
2063		// Verify cleanup
2064		assert!(handler.pending_initial_syncs.is_empty());
2065		assert!(handler.initial_sync_peer_queue.is_empty());
2066	}
2067
2068	#[tokio::test]
2069	async fn test_send_statements_in_chunks_exact_max_size() {
2070		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
2071			build_handler();
2072
2073		// Calculate the data sizes so that 100 statements together exactly fill max_size.
2074		// This tests that all 100 statements fit in a single notification.
2075		//
2076		// The limit check in find_sendable_chunk is:
2077		//   max_size = MAX_STATEMENT_NOTIFICATION_SIZE - Compact::<u32>::max_encoded_len()
2078		//
2079		// Statement encoding (encodes as Vec<Field>):
2080		// - Compact<u32> for number of fields (1 byte for value 2: expiry + data)
2081		// - Field::Expiry discriminant (1 byte, value 2)
2082		// - u64 expiry value (8 bytes)
2083		// - Field::Data discriminant (1 byte, value 8)
2084		// - Compact<u32> for the data length (2 bytes for small data)
2085		// So per-statement overhead = 1 + 1 + 8 + 1 + 2 = 13 bytes
2086		let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
2087		let num_statements: usize = 100;
2088		let per_statement_overhead = 1 + 1 + 8 + 1 + 2; // Vec<Field> length + expiry field + data discriminant + Compact data length
2089		let total_overhead = per_statement_overhead * num_statements;
2090		let total_data_size = max_size - total_overhead;
2091		let per_statement_data_size = total_data_size / num_statements;
2092		let remainder = total_data_size % num_statements;
2093
2094		let mut expected_hashes = Vec::with_capacity(num_statements);
2095		let mut total_encoded_size = 0;
2096
2097		for i in 0..num_statements {
2098			let mut statement = Statement::new();
2099			// Distribute remainder across first `remainder` statements to exactly fill max_size
2100			let extra = if i < remainder { 1 } else { 0 };
2101			let mut data = vec![42u8; per_statement_data_size + extra];
2102			// Make each statement unique by modifying the first few bytes
2103			data[0] = i as u8;
2104			data[1] = (i >> 8) as u8;
2105			statement.set_plain_data(data);
2106
2107			total_encoded_size += statement.encoded_size();
2108
2109			let hash = statement.hash();
2110			expected_hashes.push(hash);
2111			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2112		}
2113
2114		// Verify our calculation: total encoded size should be <= max_size
2115		assert!(
2116			total_encoded_size == max_size,
2117			"Total encoded size {} should be <= max_size {}",
2118			total_encoded_size,
2119			max_size
2120		);
2121
2122		handler.propagate_statements().await;
2123
2124		let sent = notification_service.get_sent_notifications();
2125
2126		// All statements should fit in a single chunk
2127		assert_eq!(
2128			sent.len(),
2129			1,
2130			"Expected 1 notification for all {} statements, but got {}",
2131			num_statements,
2132			sent.len()
2133		);
2134
2135		let (_peer, notification) = &sent[0];
2136		assert!(
2137			notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2138			"Notification size {} exceeds limit {}",
2139			notification.len(),
2140			MAX_STATEMENT_NOTIFICATION_SIZE
2141		);
2142
2143		let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
2144		assert_eq!(
2145			decoded.len(),
2146			num_statements,
2147			"Expected {} statements in the notification",
2148			num_statements
2149		);
2150
2151		// Verify all statements were sent (order may differ due to HashMap iteration)
2152		let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
2153		expected_hashes.sort();
2154		received_hashes.sort();
2155		assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
2156	}
2157
2158	#[tokio::test]
2159	async fn test_initial_sync_burst_size_limit_consistency() {
2160		// This test verifies that process_initial_sync_burst and find_sendable_chunk
2161		// use the same size limit (max_statement_payload_size).
2162		//
2163		// Previously there was a bug where the filter in process_initial_sync_burst used
2164		// MAX_STATEMENT_NOTIFICATION_SIZE, but find_sendable_chunk reserved extra space
2165		// for Compact::<u32>::max_encoded_len(). This caused a debug_assert failure when
2166		// statements fit the filter but not find_sendable_chunk.
2167		//
2168		// With the fix, both use max_statement_payload_size(), so the filter will reject
2169		// statements that wouldn't fit in find_sendable_chunk.
2170		let (mut handler, statement_store, _network, notification_service) =
2171			build_handler_no_peers();
2172
2173		let payload_limit = max_statement_payload_size();
2174
2175		// Create first statement that's just over half the payload limit
2176		let first_stmt_data_size = payload_limit / 2 + 10;
2177		let mut stmt1 = Statement::new();
2178		stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2179		let stmt1_encoded_size = stmt1.encoded_size();
2180
2181		// Create second statement that, combined with the first, exceeds the payload limit.
2182		// This means the filter will only accept the first statement.
2183		let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2184		let target_stmt2_encoded = remaining + 3; // 3 bytes over limit when combined
2185		let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); // ~4 bytes encoding overhead
2186		let mut stmt2 = Statement::new();
2187		stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2188		let stmt2_encoded_size = stmt2.encoded_size();
2189
2190		let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2191
2192		// Verify our setup: total exceeds payload limit
2193		assert!(
2194			total_encoded > payload_limit,
2195			"Total {} should exceed payload_limit {} so filter rejects second statement",
2196			total_encoded,
2197			payload_limit
2198		);
2199
2200		let hash1 = stmt1.hash();
2201		let hash2 = stmt2.hash();
2202		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2203		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2204
2205		// Setup peer and simulate connection
2206		let peer_id = PeerId::random();
2207
2208		handler
2209			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2210				peer: peer_id,
2211				direction: sc_network::service::traits::Direction::Inbound,
2212				handshake: vec![],
2213				negotiated_fallback: None,
2214			})
2215			.await;
2216
2217		// Verify initial sync was queued with both hashes
2218		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2219		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2220
2221		// Process first burst - should send only one statement (the other doesn't fit)
2222		handler.process_initial_sync_burst().await;
2223
2224		// With the fix, the filter and find_sendable_chunk use the same limit,
2225		// so no assertion failure occurs. Only one statement is fetched and sent.
2226		let sent = notification_service.get_sent_notifications();
2227		assert_eq!(sent.len(), 1, "First burst should send one notification");
2228
2229		let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2230		assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2231
2232		// Verify one of the two statements was sent (order is non-deterministic due to HashMap)
2233		let sent_hash = decoded[0].hash();
2234		assert!(
2235			sent_hash == hash1 || sent_hash == hash2,
2236			"Sent statement should be one of the two created"
2237		);
2238
2239		// Second statement should still be pending
2240		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2241		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2242
2243		// Process second burst - should send the remaining statement
2244		handler.process_initial_sync_burst().await;
2245
2246		let sent = notification_service.get_sent_notifications();
2247		assert_eq!(sent.len(), 2, "Second burst should send another notification");
2248
2249		// Both statements should now be sent
2250		let mut sent_hashes: Vec<_> = sent
2251			.iter()
2252			.flat_map(|(_, notification)| {
2253				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2254			})
2255			.map(|s| s.hash())
2256			.collect();
2257		sent_hashes.sort();
2258		let mut expected_hashes = vec![hash1, hash2];
2259		expected_hashes.sort();
2260		assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2261
2262		// No more pending
2263		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2264	}
2265
2266	#[tokio::test]
2267	async fn test_peer_disconnected_on_flooding() {
2268		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2269			build_handler();
2270
2271		let peer_id = *handler.peers.keys().next().unwrap();
2272
2273		let mut flood_statements = Vec::new();
2274		for i in 0..600_000 {
2275			let mut statement = Statement::new();
2276			statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2277			flood_statements.push(statement);
2278		}
2279
2280		handler.on_statements(peer_id, flood_statements);
2281
2282		let reports = network.get_reports();
2283		assert!(
2284			reports
2285				.iter()
2286				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2287			"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2288			reports
2289		);
2290
2291		let disconnected = network.get_disconnected_peers();
2292		assert!(
2293			disconnected.contains(&peer_id),
2294			"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2295			peer_id,
2296			disconnected
2297		);
2298
2299		// Verify peer state was cleaned up
2300		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2301		assert!(
2302			!handler.pending_initial_syncs.contains_key(&peer_id),
2303			"Peer should be removed from pending_initial_syncs"
2304		);
2305		assert!(
2306			!handler.initial_sync_peer_queue.contains(&peer_id),
2307			"Peer should be removed from initial_sync_peer_queue"
2308		);
2309	}
2310
2311	#[tokio::test]
2312	async fn test_legitimate_traffic_not_flagged() {
2313		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2314			build_handler();
2315
2316		let peer_id = *handler.peers.keys().next().unwrap();
2317
2318		let start = std::time::Instant::now();
2319		let duration = std::time::Duration::from_secs(5);
2320		let mut counter = 0u32;
2321
2322		while start.elapsed() < duration {
2323			let mut statements = Vec::new();
2324			for i in 0..5_000 {
2325				let mut statement = Statement::new();
2326				statement.set_plain_data(vec![
2327					counter as u8,
2328					(counter >> 8) as u8,
2329					(counter >> 16) as u8,
2330					i as u8,
2331				]);
2332				statements.push(statement);
2333				counter = counter.wrapping_add(1);
2334			}
2335
2336			handler.on_statements(peer_id, statements);
2337
2338			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2339		}
2340
2341		let reports = network.get_reports();
2342		assert!(
2343			!reports
2344				.iter()
2345				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2346			"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2347			reports
2348		);
2349
2350		let disconnected = network.get_disconnected_peers();
2351		assert!(
2352			!disconnected.contains(&peer_id),
2353			"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2354			disconnected
2355		);
2356
2357		assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2358	}
2359
2360	#[tokio::test]
2361	async fn test_just_over_rate_limit_triggers_flooding() {
2362		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2363			build_handler();
2364
2365		let peer_id = *handler.peers.keys().next().unwrap();
2366
2367		let mut statements = Vec::new();
2368		for i in 0..260_000 {
2369			let mut statement = Statement::new();
2370			statement.set_plain_data(vec![
2371				i as u8,
2372				(i >> 8) as u8,
2373				(i >> 16) as u8,
2374				(i >> 24) as u8,
2375			]);
2376			statements.push(statement);
2377		}
2378
2379		handler.on_statements(peer_id, statements);
2380
2381		let reports = network.get_reports();
2382		let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2383		assert!(
2384			reports
2385				.iter()
2386				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2387			"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2388			expected_burst,
2389			reports
2390		);
2391
2392		let disconnected = network.get_disconnected_peers();
2393		assert!(
2394			disconnected.contains(&peer_id),
2395			"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2396			disconnected
2397		);
2398
2399		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2400	}
2401
2402	#[tokio::test]
2403	async fn test_burst_of_250k_statements_allowed() {
2404		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2405			build_handler();
2406
2407		let peer_id = *handler.peers.keys().next().unwrap();
2408
2409		let mut statements = Vec::new();
2410		for i in 0..250_000 {
2411			let mut statement = Statement::new();
2412			statement.set_plain_data(vec![
2413				i as u8,
2414				(i >> 8) as u8,
2415				(i >> 16) as u8,
2416				(i >> 24) as u8,
2417			]);
2418			statements.push(statement);
2419		}
2420
2421		handler.on_statements(peer_id, statements);
2422
2423		let reports = network.get_reports();
2424		assert!(
2425			!reports
2426				.iter()
2427				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2428			"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2429			reports
2430		);
2431
2432		assert!(
2433			handler.peers.contains_key(&peer_id),
2434			"Peer should still be connected after 250k burst"
2435		);
2436	}
2437
2438	#[tokio::test]
2439	async fn test_sustained_rate_above_limit_triggers_flooding() {
2440		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2441			build_handler();
2442
2443		let peer_id = *handler.peers.keys().next().unwrap();
2444
2445		let mut counter = 0u32;
2446
2447		let start = std::time::Instant::now();
2448		let duration = std::time::Duration::from_secs(5);
2449
2450		let mut flooding_detected = false;
2451		while start.elapsed() < duration {
2452			let mut statements = Vec::new();
2453			for i in 0..30_000 {
2454				let mut statement = Statement::new();
2455				statement.set_plain_data(vec![
2456					counter as u8,
2457					(counter >> 8) as u8,
2458					(counter >> 16) as u8,
2459					i as u8,
2460				]);
2461				statements.push(statement);
2462				counter = counter.wrapping_add(1);
2463			}
2464
2465			handler.on_statements(peer_id, statements);
2466
2467			// Check if flooding was detected
2468			let reports = network.get_reports();
2469			if reports
2470				.iter()
2471				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2472			{
2473				flooding_detected = true;
2474				break;
2475			}
2476
2477			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2478		}
2479
2480		assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2481
2482		let disconnected = network.get_disconnected_peers();
2483		assert!(
2484			disconnected.contains(&peer_id),
2485			"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2486			disconnected
2487		);
2488
2489		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2490	}
2491
2492	/// Verifies that peers connecting during major sync are buffered in `deferred_peers` with no
2493	/// network calls, and that a disconnect before sync ends removes the peer from the buffer
2494	#[test]
2495	fn major_sync_defers_peers_and_handles_disconnect() {
2496		let (sync, _flag) = TestSync::with_syncing(true);
2497		let network = TestNetwork::new();
2498		let notification_service = TestNotificationService::new();
2499		let statement_store = TestStatementStore::new();
2500		let (queue_sender, _queue_receiver) = async_channel::bounded(100);
2501
2502		let mut handler = StatementHandler {
2503			protocol_name: "/statement/1".into(),
2504			notification_service: Box::new(notification_service),
2505			propagate_timeout: (Box::pin(futures::stream::pending())
2506				as Pin<Box<dyn Stream<Item = ()> + Send>>)
2507				.fuse(),
2508			pending_statements: FuturesUnordered::new(),
2509			pending_statements_peers: HashMap::new(),
2510			network: network.clone(),
2511			sync,
2512			sync_event_stream: (Box::pin(futures::stream::pending())
2513				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2514				.fuse(),
2515			peers: HashMap::new(),
2516			statement_store: Arc::new(statement_store),
2517			queue_sender,
2518			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2519				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2520			metrics: None,
2521			initial_sync_timeout: Box::pin(futures::future::pending()),
2522			pending_initial_syncs: HashMap::new(),
2523			initial_sync_peer_queue: VecDeque::new(),
2524			deferred_peers: HashSet::new(),
2525			dropped_statements_during_sync: false,
2526			sync_recovery_peer: None,
2527			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2528		};
2529
2530		let peer1 = PeerId::random();
2531		let peer2 = PeerId::random();
2532		let peer3 = PeerId::random();
2533
2534		handler.handle_sync_event(SyncEvent::PeerConnected(peer1));
2535		handler.handle_sync_event(SyncEvent::PeerConnected(peer2));
2536		handler.handle_sync_event(SyncEvent::PeerConnected(peer3));
2537
2538		// No network calls while major sync is active
2539		assert!(network.get_added_reserved().is_empty());
2540		assert!(network.get_removed_reserved().is_empty());
2541		assert_eq!(handler.deferred_peers.len(), 3);
2542
2543		// Disconnect before sync ends must remove from buffer only
2544		handler.handle_sync_event(SyncEvent::PeerDisconnected(peer1));
2545		assert_eq!(handler.deferred_peers.len(), 2);
2546		assert!(!handler.deferred_peers.contains(&peer1), "disconnected peer must leave buffer");
2547		assert!(handler.deferred_peers.contains(&peer2));
2548		assert!(handler.deferred_peers.contains(&peer3));
2549		assert!(network.get_removed_reserved().is_empty(), "no remove call for buffered peer");
2550	}
2551
2552	#[test]
2553	fn deferred_peers_flushed_on_sync_end_without_remove() {
2554		let (sync, flag) = TestSync::with_syncing(true);
2555		let network = TestNetwork::new();
2556		let notification_service = TestNotificationService::new();
2557		let statement_store = TestStatementStore::new();
2558		let (queue_sender, _queue_receiver) = async_channel::bounded(100);
2559
2560		let peer1 = PeerId::random();
2561		let peer2 = PeerId::random();
2562		let mut deferred = HashSet::new();
2563		deferred.insert(peer1);
2564		deferred.insert(peer2);
2565
2566		let mut handler = StatementHandler {
2567			protocol_name: "/statement/1".into(),
2568			notification_service: Box::new(notification_service),
2569			propagate_timeout: (Box::pin(futures::stream::pending())
2570				as Pin<Box<dyn Stream<Item = ()> + Send>>)
2571				.fuse(),
2572			pending_statements: FuturesUnordered::new(),
2573			pending_statements_peers: HashMap::new(),
2574			network: network.clone(),
2575			sync,
2576			sync_event_stream: (Box::pin(futures::stream::pending())
2577				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2578				.fuse(),
2579			peers: HashMap::new(),
2580			statement_store: Arc::new(statement_store),
2581			queue_sender,
2582			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2583				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2584			metrics: None,
2585			initial_sync_timeout: Box::pin(futures::future::pending()),
2586			pending_initial_syncs: HashMap::new(),
2587			initial_sync_peer_queue: VecDeque::new(),
2588			deferred_peers: deferred,
2589			dropped_statements_during_sync: false,
2590			sync_recovery_peer: None,
2591			sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2592		};
2593
2594		flag.store(false, Ordering::Relaxed);
2595		handler.drain_deferred_peers();
2596
2597		assert!(handler.deferred_peers.is_empty());
2598
2599		let added = network.get_added_reserved();
2600		assert_eq!(added.len(), 1);
2601		let added_addrs = &added[0];
2602		let expected_addr1: sc_network::Multiaddr =
2603			iter::once(multiaddr::Protocol::P2p(peer1.into())).collect();
2604		let expected_addr2: sc_network::Multiaddr =
2605			iter::once(multiaddr::Protocol::P2p(peer2.into())).collect();
2606		assert!(added_addrs.contains(&expected_addr1), "peer1 must be in added set");
2607		assert!(added_addrs.contains(&expected_addr2), "peer2 must be in added set");
2608
2609		assert!(network.get_removed_reserved().is_empty());
2610	}
2611
2612	#[tokio::test]
2613	async fn sync_recovery_schedules_remove_for_one_connected_peer() {
2614		let network = TestNetwork::new();
2615		let notification_service = TestNotificationService::new();
2616		let (sync, _flag) = TestSync::with_syncing(false);
2617		let (queue_sender, _) = async_channel::bounded(2);
2618		let statement_store = TestStatementStore::new();
2619
2620		let connected_peer = PeerId::random();
2621
2622		let mut peers = HashMap::new();
2623		peers.insert(
2624			connected_peer,
2625			Peer {
2626				known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
2627				rate_limiter: PeerRateLimiter::new(
2628					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2629						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2630					NonZeroU32::new(
2631						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
2632					)
2633					.expect("burst capacity is nonzero"),
2634				),
2635			},
2636		);
2637
2638		let mut handler = StatementHandler {
2639			protocol_name: "/statement/1".into(),
2640			notification_service: Box::new(notification_service),
2641			propagate_timeout: (Box::pin(futures::stream::pending())
2642				as Pin<Box<dyn Stream<Item = ()> + Send>>)
2643				.fuse(),
2644			pending_statements: FuturesUnordered::new(),
2645			pending_statements_peers: HashMap::new(),
2646			network: network.clone(),
2647			sync,
2648			sync_event_stream: (Box::pin(futures::stream::pending())
2649				as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2650				.fuse(),
2651			peers,
2652			statement_store: Arc::new(statement_store),
2653			queue_sender,
2654			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2655				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2656			metrics: None,
2657			initial_sync_timeout: Box::pin(futures::future::pending()),
2658			pending_initial_syncs: HashMap::new(),
2659			initial_sync_peer_queue: VecDeque::new(),
2660			deferred_peers: HashSet::new(),
2661			dropped_statements_during_sync: true,
2662			sync_recovery_peer: None,
2663			sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2664		};
2665
2666		handler.start_sync_recovery();
2667
2668		// One remove call must have been issued for the connected peer
2669		{
2670			let removed = network.removed_reserved.lock().unwrap();
2671			assert_eq!(
2672				removed.len(),
2673				1,
2674				"Expected exactly one remove_peers_from_reserved_set call"
2675			);
2676			assert!(removed[0].contains(&connected_peer));
2677		}
2678
2679		// The recovery peer must be stored and the timeout future must be armed
2680		assert_eq!(handler.sync_recovery_peer, Some(connected_peer));
2681
2682		// Calling try_readd_sync_recovery_peer directly (as the select arm would after the future
2683		// resolves) must re-add the peer and clear the field
2684		handler.try_readd_sync_recovery_peer();
2685		assert!(handler.sync_recovery_peer.is_none());
2686		{
2687			let added = network.added_reserved.lock().unwrap();
2688			assert_eq!(added.len(), 1);
2689			let expected_addr: multiaddr::Multiaddr =
2690				iter::once(multiaddr::Protocol::P2p(connected_peer.into())).collect();
2691			assert!(added[0].contains(&expected_addr));
2692		}
2693
2694		// Re-entry guard: restore state to simulate a second sync-end while recovery is still
2695		// in flight (sync_recovery_peer is Some). The second call must not issue another remove.
2696		{
2697			let peer2 = PeerId::random();
2698			handler.sync_recovery_peer = Some(peer2);
2699			handler.start_sync_recovery();
2700			assert_eq!(
2701				handler.sync_recovery_peer,
2702				Some(peer2),
2703				"Re-entry guard: recovery peer must not change on second call"
2704			);
2705			assert_eq!(
2706				network.removed_reserved.lock().unwrap().len(),
2707				1,
2708				"Re-entry guard: no extra remove call while recovery is in flight"
2709			);
2710		}
2711	}
2712
2713	#[tokio::test]
2714	async fn sync_recovery_gated_by_dropped_statements_flag() {
2715		let make_peer = || Peer {
2716			known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
2717			rate_limiter: PeerRateLimiter::new(
2718				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2719					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2720				NonZeroU32::new(
2721					DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
2722				)
2723				.expect("burst capacity is nonzero"),
2724			),
2725		};
2726
2727		let make_handler =
2728			|network: TestNetwork, dropped: bool| -> StatementHandler<TestNetwork, TestSync> {
2729				let (sync, _) = TestSync::with_syncing(false);
2730				let (queue_sender, _) = async_channel::bounded(2);
2731				let mut peers = HashMap::new();
2732				peers.insert(PeerId::random(), make_peer());
2733				StatementHandler {
2734					protocol_name: "/statement/1".into(),
2735					notification_service: Box::new(TestNotificationService::new()),
2736					propagate_timeout: (Box::pin(futures::stream::pending())
2737						as Pin<Box<dyn Stream<Item = ()> + Send>>)
2738						.fuse(),
2739					pending_statements: FuturesUnordered::new(),
2740					pending_statements_peers: HashMap::new(),
2741					network,
2742					sync,
2743					sync_event_stream: (Box::pin(futures::stream::pending())
2744						as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2745						.fuse(),
2746					peers,
2747					statement_store: Arc::new(TestStatementStore::new()),
2748					queue_sender,
2749					statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2750						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2751					metrics: None,
2752					initial_sync_timeout: Box::pin(futures::future::pending()),
2753					pending_initial_syncs: HashMap::new(),
2754					initial_sync_peer_queue: VecDeque::new(),
2755					deferred_peers: HashSet::new(),
2756					dropped_statements_during_sync: dropped,
2757					sync_recovery_peer: None,
2758					sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2759				}
2760			};
2761
2762		// flag=false → no recovery
2763		let net = TestNetwork::new();
2764		let mut handler = make_handler(net.clone(), false);
2765		handler.start_sync_recovery();
2766		assert!(handler.sync_recovery_peer.is_none());
2767		assert!(net.get_removed_reserved().is_empty());
2768
2769		// flag=true → recovery fires
2770		let net2 = TestNetwork::new();
2771		let mut handler2 = make_handler(net2.clone(), true);
2772		handler2.start_sync_recovery();
2773		assert!(handler2.sync_recovery_peer.is_some());
2774		assert_eq!(net2.get_removed_reserved().len(), 1);
2775	}
2776}