Skip to main content

soil_network/statement/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Statement handling to plug on top of the network service.
8//!
9//! Usage:
10//!
11//! - Use [`StatementHandlerPrototype::new`] to create a prototype.
12//! - Pass the `NonDefaultSetConfig` returned from [`StatementHandlerPrototype::new`] to the network
13//!   configuration as an extra peers set.
14//! - Use [`StatementHandlerPrototype::build`] then [`StatementHandler::run`] to obtain a
15//! `Future` that processes statements.
16
17use self::config::*;
18
19use codec::{Compact, Decode, Encode, MaxEncodedLen};
20#[cfg(any(test, feature = "test-helpers"))]
21use futures::future::pending;
22use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
23use governor::{
24	clock::DefaultClock,
25	state::{InMemoryState, NotKeyed},
26	Quota, RateLimiter,
27};
28use soil_prometheus::{
29	exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
30	Registry, U64,
31};
32use soil_network::sync::{SyncEvent, SyncEventStream};
33use soil_network::types::PeerId;
34use soil_network::{
35	config::{NonReservedPeerMode, SetConfig},
36	error, multiaddr,
37	peer_store::PeerStoreProvider,
38	service::{
39		traits::{NotificationEvent, NotificationService, ValidationResult},
40		NotificationMetrics,
41	},
42	types::ProtocolName,
43	utils::{interval, LruHashSet},
44	NetworkBackend, NetworkEventStream, NetworkPeers,
45};
46use soil_statement_store::{
47	FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
48};
49use std::{
50	collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
51	iter,
52	num::{NonZeroU32, NonZeroUsize},
53	pin::Pin,
54	sync::Arc,
55	time::Instant,
56};
57use subsoil::runtime::traits::Block as BlockT;
58use tokio::time::timeout;
59pub mod config;
60
61/// A set of statements.
62pub type Statements = Vec<Statement>;
63/// Future resolving to statement import result.
64pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
65
66mod rep {
67	use soil_network::ReputationChange as Rep;
68	/// Reputation change when a peer sends us any statement.
69	///
70	/// This forces node to verify it, thus the negative value here. Once statement is verified,
71	/// reputation change should be refunded with `ANY_STATEMENT_REFUND`
72	pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
73	/// Reputation change when a peer sends us any statement that is not invalid.
74	pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
75	/// Reputation change when a peer sends us an statement that we didn't know about.
76	pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
77	/// Reputation change when a peer sends us an invalid statement.
78	pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
79	/// Reputation change when a peer sends us a duplicate statement.
80	pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
81	/// Reputation change when a peer floods us with statements.
82	pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
83}
84
85const LOG_TARGET: &str = "statement-gossip";
86/// Maximim time we wait for sending a notification to a peer.
87const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
88/// Interval for sending statement batches during initial sync to new peers.
89const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
90
91struct Metrics {
92	propagated_statements: Counter<U64>,
93	known_statements_received: Counter<U64>,
94	skipped_oversized_statements: Counter<U64>,
95	propagated_statements_chunks: Histogram,
96	pending_statements: Gauge<U64>,
97	ignored_statements: Counter<U64>,
98	peers_connected: Gauge<U64>,
99	statements_received: Counter<U64>,
100	bytes_sent_total: Counter<U64>,
101	bytes_received_total: Counter<U64>,
102	sent_latency_seconds: Histogram,
103	initial_sync_statements_sent: Counter<U64>,
104	initial_sync_bursts_total: Counter<U64>,
105	initial_sync_peers_active: Gauge<U64>,
106	initial_sync_duration_seconds: Histogram,
107	statement_flooding_detected: Counter<U64>,
108}
109
110impl Metrics {
111	fn register(r: &Registry) -> Result<Self, PrometheusError> {
112		Ok(Self {
113			propagated_statements: register(
114				Counter::new(
115					"substrate_sync_propagated_statements",
116					"Number of statements propagated to at least one peer",
117				)?,
118				r,
119			)?,
120			known_statements_received: register(
121				Counter::new(
122					"substrate_sync_known_statement_received",
123					"Number of statements received via gossiping that were already in the statement store",
124				)?,
125				r,
126			)?,
127			skipped_oversized_statements: register(
128				Counter::new(
129					"substrate_sync_skipped_oversized_statements",
130					"Number of oversized statements that were skipped to be gossiped",
131				)?,
132				r,
133			)?,
134			propagated_statements_chunks: register(
135				Histogram::with_opts(
136					HistogramOpts::new(
137						"substrate_sync_propagated_statements_chunks",
138						"Distribution of chunk sizes when propagating statements",
139					)
140					.buckets(exponential_buckets(1.0, 2.0, 14)?),
141				)?,
142				r,
143			)?,
144			pending_statements: register(
145				Gauge::new(
146					"substrate_sync_pending_statement_validations",
147					"Number of pending statement validations",
148				)?,
149				r,
150			)?,
151			ignored_statements: register(
152				Counter::new(
153					"substrate_sync_ignored_statements",
154					"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
155				)?,
156				r,
157			)?,
158			peers_connected: register(
159				Gauge::new(
160					"substrate_sync_statement_peers_connected",
161					"Number of peers connected using the statement protocol",
162				)?,
163				r,
164			)?,
165			statements_received: register(
166				Counter::new(
167					"substrate_sync_statements_received",
168					"Total number of statements received from peers",
169				)?,
170				r,
171			)?,
172			bytes_sent_total: register(
173				Counter::new(
174					"substrate_sync_statement_bytes_sent_total",
175					"Total bytes sent for statement protocol messages",
176				)?,
177				r,
178			)?,
179			bytes_received_total: register(
180				Counter::new(
181					"substrate_sync_statement_bytes_received_total",
182					"Total bytes received for statement protocol messages",
183				)?,
184				r,
185			)?,
186			sent_latency_seconds: register(
187				Histogram::with_opts(
188					HistogramOpts::new(
189						"substrate_sync_statement_sent_latency_seconds",
190						"Time to send statement messages to peers",
191					)
192					// Buckets from 1μs to ~1s covering microsecond to millisecond range.
193					.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
194				)?,
195				r,
196			)?,
197			initial_sync_statements_sent: register(
198				Counter::new(
199					"substrate_sync_initial_sync_statements_sent",
200					"Total statements sent during initial sync bursts to newly connected peers",
201				)?,
202				r,
203			)?,
204			initial_sync_bursts_total: register(
205				Counter::new(
206					"substrate_sync_initial_sync_bursts_total",
207					"Total number of initial sync burst rounds processed",
208				)?,
209				r,
210			)?,
211			initial_sync_peers_active: register(
212				Gauge::new(
213					"substrate_sync_initial_sync_peers_active",
214					"Number of peers currently being synced via initial sync",
215				)?,
216				r,
217			)?,
218			initial_sync_duration_seconds: register(
219				Histogram::with_opts(
220					HistogramOpts::new(
221						"substrate_sync_initial_sync_duration_seconds",
222						"Per-peer total duration of initial sync from start to completion",
223					)
224					.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
225				)?,
226				r,
227			)?,
228			statement_flooding_detected: register(
229				Counter::new(
230					"substrate_sync_statement_flooding_detected",
231					"Number of peers disconnected for exceeding statement rate limits",
232				)?,
233				r,
234			)?,
235		})
236	}
237}
238
239/// Prototype for a [`StatementHandler`].
240pub struct StatementHandlerPrototype {
241	protocol_name: ProtocolName,
242	notification_service: Box<dyn NotificationService>,
243}
244
245impl StatementHandlerPrototype {
246	/// Create a new instance.
247	pub fn new<
248		Hash: AsRef<[u8]>,
249		Block: BlockT,
250		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
251	>(
252		genesis_hash: Hash,
253		fork_id: Option<&str>,
254		metrics: NotificationMetrics,
255		peer_store_handle: Arc<dyn PeerStoreProvider>,
256	) -> (Self, Net::NotificationProtocolConfig) {
257		let genesis_hash = genesis_hash.as_ref();
258		let protocol_name = if let Some(fork_id) = fork_id {
259			format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
260		} else {
261			format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
262		};
263		let (config, notification_service) = Net::notification_config(
264			protocol_name.clone().into(),
265			Vec::new(),
266			MAX_STATEMENT_NOTIFICATION_SIZE,
267			None,
268			SetConfig {
269				in_peers: 0,
270				out_peers: 0,
271				reserved_nodes: Vec::new(),
272				non_reserved_mode: NonReservedPeerMode::Deny,
273			},
274			metrics,
275			peer_store_handle,
276		);
277
278		(Self { protocol_name: protocol_name.into(), notification_service }, config)
279	}
280
281	/// Turns the prototype into the actual handler.
282	///
283	/// Important: the statements handler is initially disabled and doesn't gossip statements.
284	/// Gossiping is enabled when major syncing is done.
285	pub fn build<
286		N: NetworkPeers + NetworkEventStream,
287		S: SyncEventStream + soil_client::consensus::SyncOracle,
288	>(
289		self,
290		network: N,
291		sync: S,
292		statement_store: Arc<dyn StatementStore>,
293		metrics_registry: Option<&Registry>,
294		executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
295		mut num_submission_workers: usize,
296		statements_per_second: u32,
297	) -> error::Result<StatementHandler<N, S>> {
298		let sync_event_stream = sync.event_stream("statement-handler-sync");
299		let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
300
301		if num_submission_workers == 0 {
302			log::warn!(
303				target: LOG_TARGET,
304				"num_submission_workers is 0, defaulting to 1"
305			);
306			num_submission_workers = 1;
307		}
308
309		let statements_per_second = match NonZeroU32::new(statements_per_second) {
310			Some(rate) => rate,
311			None => {
312				log::warn!(
313					target: LOG_TARGET,
314					"statements_per_second is 0, defaulting to {}",
315					DEFAULT_STATEMENTS_PER_SECOND
316				);
317				NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
318					.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
319			},
320		};
321
322		let metrics =
323			if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
324
325		for _ in 0..num_submission_workers {
326			let store = statement_store.clone();
327			let mut queue_receiver = queue_receiver.clone();
328			executor(
329				async move {
330					loop {
331						let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
332							queue_receiver.next().await;
333						match task {
334							None => return,
335							Some((statement, completion)) => {
336								let result = store.submit(statement, StatementSource::Network);
337								if completion.send(result).is_err() {
338									log::debug!(
339										target: LOG_TARGET,
340										"Error sending validation completion"
341									);
342								}
343							},
344						}
345					}
346				}
347				.boxed(),
348			);
349		}
350
351		let handler = StatementHandler {
352			protocol_name: self.protocol_name,
353			notification_service: self.notification_service,
354			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
355				as Pin<Box<dyn Stream<Item = ()> + Send>>)
356				.fuse(),
357			pending_statements: FuturesUnordered::new(),
358			pending_statements_peers: HashMap::new(),
359			network,
360			sync,
361			sync_event_stream: sync_event_stream.fuse(),
362			peers: HashMap::new(),
363			statement_store,
364			queue_sender,
365			statements_per_second,
366			metrics,
367			initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
368			pending_initial_syncs: HashMap::new(),
369			initial_sync_peer_queue: VecDeque::new(),
370		};
371
372		Ok(handler)
373	}
374}
375
376/// Handler for statements. Call [`StatementHandler::run`] to start the processing.
377pub struct StatementHandler<
378	N: NetworkPeers + NetworkEventStream,
379	S: SyncEventStream + soil_client::consensus::SyncOracle,
380> {
381	protocol_name: ProtocolName,
382	/// Interval at which we call `propagate_statements`.
383	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
384	/// Pending statements verification tasks.
385	pending_statements:
386		FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
387	/// As multiple peers can send us the same statement, we group
388	/// these peers using the statement hash while the statement is
389	/// imported. This prevents that we import the same statement
390	/// multiple times concurrently.
391	pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
392	/// Network service to use to send messages and manage peers.
393	network: N,
394	/// Syncing service.
395	sync: S,
396	/// Receiver for syncing-related events.
397	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
398	/// Notification service.
399	notification_service: Box<dyn NotificationService>,
400	// All connected peers
401	peers: HashMap<PeerId, Peer>,
402	statement_store: Arc<dyn StatementStore>,
403	queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
404	/// Maximum statements per second per peer.
405	statements_per_second: NonZeroU32,
406	/// Prometheus metrics.
407	metrics: Option<Metrics>,
408	/// Timeout for sending next statement batch during initial sync.
409	initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
410	/// Pending initial syncs per peer.
411	pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
412	/// Queue for round-robin processing of initial syncs.
413	initial_sync_peer_queue: VecDeque<PeerId>,
414}
415
416/// Per-peer rate limiter using a token bucket algorithm.
417///
418/// The token bucket allows short bursts up to the per-second limit while enforcing
419/// the average rate over time.
420#[derive(Debug)]
421struct PeerRateLimiter {
422	limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
423}
424
425impl PeerRateLimiter {
426	fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
427		let quota = Quota::per_second(statements_per_second).allow_burst(burst);
428		Self { limiter: RateLimiter::direct(quota) }
429	}
430
431	/// Check if receiving `count` statements would exceed the rate limit.
432	fn is_flooding(&self, count: usize) -> bool {
433		if count > u32::MAX as usize {
434			return true;
435		}
436
437		let Some(n) = NonZeroU32::new(count as u32) else {
438			return false;
439		};
440		!matches!(self.limiter.check_n(n), Ok(Ok(())))
441	}
442}
443
444/// Peer information
445#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
446#[derive(Debug)]
447pub struct Peer {
448	/// Holds a set of statements known to this peer.
449	known_statements: LruHashSet<Hash>,
450	/// Rate limiter for statement flooding protection.
451	rate_limiter: PeerRateLimiter,
452}
453
454/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
455struct PendingInitialSync {
456	hashes: Vec<Hash>,
457	started_at: Instant,
458}
459
460/// Result of finding a sendable chunk of statements.
461enum ChunkResult {
462	/// Found a chunk that fits. Contains the end index (exclusive).
463	Send(usize),
464	/// First statement is oversized, skip it.
465	SkipOversized,
466}
467
468/// Result of sending a chunk of statements.
469enum SendChunkResult {
470	/// Successfully sent a chunk of N statements.
471	Sent(usize),
472	/// First statement was oversized and skipped.
473	Skipped,
474	/// Nothing to send.
475	Empty,
476	/// Send failed.
477	Failed,
478}
479
480/// Returns the maximum payload size for statement notifications.
481///
482/// This reserves space for encoding the length of the vector (Compact<u32>),
483/// ensuring the final encoded message fits within MAX_STATEMENT_NOTIFICATION_SIZE.
484fn max_statement_payload_size() -> usize {
485	MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
486}
487
488/// Find the largest chunk of statements starting from the beginning that fits
489/// within MAX_STATEMENT_NOTIFICATION_SIZE.
490///
491/// Uses an incremental approach: adds statements one by one until the limit is reached.
492/// This is efficient because we only compute sizes for statements we'll actually send
493/// in this chunk, rather than computing sizes for all statements upfront.
494fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
495	if statements.is_empty() {
496		return ChunkResult::Send(0);
497	}
498	let max_size = max_statement_payload_size();
499
500	// Incrementally add statements until we exceed the limit.
501	// This is efficient because we only compute sizes for statements in this chunk.
502	// accumulated_size is the sum of encoded sizes of all statements so far (without vec
503	// overhead).
504	let mut accumulated_size = 0;
505	let mut count = 0usize;
506
507	for stmt in &statements[0..] {
508		let stmt_size = stmt.encoded_size();
509		let new_count = count + 1;
510		// Compact encoding overhead for the new count
511		let new_total = accumulated_size + stmt_size;
512		if new_total > max_size {
513			break;
514		}
515
516		accumulated_size += stmt_size;
517		count = new_count;
518	}
519
520	// If we couldn't fit even a single statement, skip it.
521	if count == 0 {
522		ChunkResult::SkipOversized
523	} else {
524		ChunkResult::Send(count)
525	}
526}
527
528impl Peer {
529	/// Create a new peer for testing/benchmarking purposes.
530	#[cfg(any(test, feature = "test-helpers"))]
531	pub fn new_for_testing(
532		known_statements: LruHashSet<Hash>,
533		statements_per_second: NonZeroU32,
534		burst: NonZeroU32,
535	) -> Self {
536		Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
537	}
538}
539
540impl<N, S> StatementHandler<N, S>
541where
542	N: NetworkPeers + NetworkEventStream,
543	S: SyncEventStream + soil_client::consensus::SyncOracle,
544{
545	/// Create a new `StatementHandler` for testing/benchmarking purposes.
546	#[cfg(any(test, feature = "test-helpers"))]
547	pub fn new_for_testing(
548		protocol_name: ProtocolName,
549		notification_service: Box<dyn NotificationService>,
550		propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
551		network: N,
552		sync: S,
553		sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
554		peers: HashMap<PeerId, Peer>,
555		statement_store: Arc<dyn StatementStore>,
556		queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
557		statements_per_second: NonZeroU32,
558	) -> Self {
559		Self {
560			protocol_name,
561			notification_service,
562			propagate_timeout,
563			pending_statements: FuturesUnordered::new(),
564			pending_statements_peers: HashMap::new(),
565			network,
566			sync,
567			sync_event_stream,
568			peers,
569			statement_store,
570			queue_sender,
571			statements_per_second,
572			metrics: None,
573			initial_sync_timeout: Box::pin(pending().fuse()),
574			pending_initial_syncs: HashMap::new(),
575			initial_sync_peer_queue: VecDeque::new(),
576		}
577	}
578
579	/// Get mutable access to pending statements for testing/benchmarking.
580	#[cfg(any(test, feature = "test-helpers"))]
581	pub fn pending_statements_mut(
582		&mut self,
583	) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
584	{
585		&mut self.pending_statements
586	}
587
588	/// Turns the [`StatementHandler`] into a future that should run forever and not be
589	/// interrupted.
590	pub async fn run(mut self) {
591		loop {
592			futures::select_biased! {
593				_ = self.propagate_timeout.next() => {
594					self.propagate_statements().await;
595					self.metrics.as_ref().map(|metrics| {
596						metrics.pending_statements.set(self.pending_statements.len() as u64);
597					});
598				},
599				(hash, result) = self.pending_statements.select_next_some() => {
600					if let Some(peers) = self.pending_statements_peers.remove(&hash) {
601						if let Some(result) = result {
602							peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
603						}
604					} else {
605						log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
606					}
607				},
608				sync_event = self.sync_event_stream.next() => {
609					if let Some(sync_event) = sync_event {
610						self.handle_sync_event(sync_event);
611					} else {
612						// Syncing has seemingly closed. Closing as well.
613						return;
614					}
615				}
616				event = self.notification_service.next_event().fuse() => {
617					if let Some(event) = event {
618						self.handle_notification_event(event).await
619					} else {
620						// `Notifications` has seemingly closed. Closing as well.
621						return
622					}
623				}
624				_ = &mut self.initial_sync_timeout => {
625					self.process_initial_sync_burst().await;
626					self.initial_sync_timeout =
627						Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
628				},
629			}
630		}
631	}
632
633	/// Send a single chunk of statements to a peer.
634	async fn send_statement_chunk(
635		&mut self,
636		peer: &PeerId,
637		statements: &[&Statement],
638	) -> SendChunkResult {
639		match find_sendable_chunk(statements) {
640			ChunkResult::Send(0) => SendChunkResult::Empty,
641			ChunkResult::Send(chunk_end) => {
642				let chunk = &statements[..chunk_end];
643				let encoded = chunk.encode();
644				let bytes_to_send = encoded.len() as u64;
645
646				let sent_latency_timer =
647					self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
648				let send_result = timeout(
649					SEND_TIMEOUT,
650					self.notification_service.send_async_notification(peer, encoded),
651				)
652				.await;
653				drop(sent_latency_timer);
654
655				if let Err(e) = send_result {
656					log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
657					return SendChunkResult::Failed;
658				}
659
660				log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
661				self.metrics.as_ref().map(|metrics| {
662					metrics.propagated_statements.inc_by(chunk.len() as u64);
663					metrics.bytes_sent_total.inc_by(bytes_to_send);
664					metrics.propagated_statements_chunks.observe(chunk.len() as f64);
665				});
666				SendChunkResult::Sent(chunk_end)
667			},
668			ChunkResult::SkipOversized => {
669				log::warn!(target: LOG_TARGET, "Statement too large, skipping");
670				self.metrics.as_ref().map(|metrics| {
671					metrics.skipped_oversized_statements.inc();
672				});
673				SendChunkResult::Skipped
674			},
675		}
676	}
677
678	fn handle_sync_event(&mut self, event: SyncEvent) {
679		match event {
680			SyncEvent::PeerConnected(remote) => {
681				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
682					.collect::<multiaddr::Multiaddr>();
683				let result = self.network.add_peers_to_reserved_set(
684					self.protocol_name.clone(),
685					iter::once(addr).collect(),
686				);
687				if let Err(err) = result {
688					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
689				}
690			},
691			SyncEvent::PeerDisconnected(remote) => {
692				let result = self.network.remove_peers_from_reserved_set(
693					self.protocol_name.clone(),
694					iter::once(remote).collect(),
695				);
696				if let Err(err) = result {
697					log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
698				}
699			},
700		}
701	}
702
703	async fn handle_notification_event(&mut self, event: NotificationEvent) {
704		match event {
705			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
706				// Only accept peers whose role can be determined
707				let result = self
708					.network
709					.peer_role(peer, handshake)
710					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
711				let _ = result_tx.send(result);
712			},
713			NotificationEvent::NotificationStreamOpened { peer, .. } => {
714				let _was_in = self.peers.insert(
715					peer,
716					Peer {
717						known_statements: LruHashSet::new(
718							NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
719						),
720						rate_limiter: PeerRateLimiter::new(
721							self.statements_per_second,
722							NonZeroU32::new(
723								self.statements_per_second.get()
724									* config::STATEMENTS_BURST_COEFFICIENT,
725							)
726							.expect("burst capacity is nonzero"),
727						),
728					},
729				);
730				debug_assert!(_was_in.is_none());
731
732				self.metrics.as_ref().map(|metrics| {
733					metrics.peers_connected.set(self.peers.len() as u64);
734				});
735
736				if !self.sync.is_major_syncing() {
737					let hashes = self.statement_store.statement_hashes();
738					if !hashes.is_empty() {
739						self.pending_initial_syncs.insert(
740							peer,
741							PendingInitialSync { hashes, started_at: Instant::now() },
742						);
743						self.initial_sync_peer_queue.push_back(peer);
744						self.metrics.as_ref().map(|metrics| {
745							metrics.initial_sync_peers_active.inc();
746						});
747					}
748				}
749			},
750			NotificationEvent::NotificationStreamClosed { peer } => {
751				let _peer = self.peers.remove(&peer);
752				debug_assert!(_peer.is_some());
753				if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
754					self.metrics.as_ref().map(|metrics| {
755						metrics.initial_sync_peers_active.dec();
756						metrics
757							.initial_sync_duration_seconds
758							.observe(pending.started_at.elapsed().as_secs_f64());
759					});
760				}
761				self.initial_sync_peer_queue.retain(|p| *p != peer);
762				self.metrics.as_ref().map(|metrics| {
763					metrics.peers_connected.set(self.peers.len() as u64);
764				});
765			},
766			NotificationEvent::NotificationReceived { peer, notification } => {
767				let bytes_received = notification.len() as u64;
768				self.metrics.as_ref().map(|metrics| {
769					metrics.bytes_received_total.inc_by(bytes_received);
770				});
771
772				// Accept statements only when node is not major syncing
773				if self.sync.is_major_syncing() {
774					log::trace!(
775						target: LOG_TARGET,
776						"{peer}: Ignoring statements while major syncing or offline"
777					);
778					return;
779				}
780
781				if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
782					self.on_statements(peer, statements);
783				} else {
784					log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
785				}
786			},
787		}
788	}
789
790	/// Called when peer sends us new statements
791	#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
792	pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
793		log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
794
795		self.metrics.as_ref().map(|metrics| {
796			metrics.statements_received.inc_by(statements.len() as u64);
797		});
798
799		if let Some(ref mut peer) = self.peers.get_mut(&who) {
800			if peer.rate_limiter.is_flooding(statements.len()) {
801				log::warn!(
802					target: LOG_TARGET,
803					"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
804					who,
805					self.statements_per_second
806				);
807
808				self.network.report_peer(who, rep::STATEMENT_FLOODING);
809				self.network.disconnect_peer(who, self.protocol_name.clone());
810				if let Some(ref metrics) = self.metrics {
811					metrics.statement_flooding_detected.inc();
812				}
813
814				// Clean up peer state immediately
815				self.peers.remove(&who);
816				self.pending_initial_syncs.remove(&who);
817				self.initial_sync_peer_queue.retain(|p| *p != who);
818
819				return;
820			}
821
822			let mut statements_left = statements.len() as u64;
823			for s in statements {
824				if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
825					log::debug!(
826						target: LOG_TARGET,
827						"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
828						statements_left,
829						MAX_PENDING_STATEMENTS,
830					);
831					self.metrics.as_ref().map(|metrics| {
832						metrics.ignored_statements.inc_by(statements_left);
833					});
834					break;
835				}
836
837				let hash = s.hash();
838				peer.known_statements.insert(hash);
839
840				if self.statement_store.has_statement(&hash) {
841					self.metrics.as_ref().map(|metrics| {
842						metrics.known_statements_received.inc();
843					});
844
845					if let Some(peers) = self.pending_statements_peers.get(&hash) {
846						if peers.contains(&who) {
847							log::trace!(
848								target: LOG_TARGET,
849								"Already received the statement from the same peer {who}.",
850							);
851							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
852						}
853					}
854					continue;
855				}
856
857				self.network.report_peer(who, rep::ANY_STATEMENT);
858
859				match self.pending_statements_peers.entry(hash) {
860					Entry::Vacant(entry) => {
861						let (completion_sender, completion_receiver) = oneshot::channel();
862						match self.queue_sender.try_send((s, completion_sender)) {
863							Ok(()) => {
864								self.pending_statements.push(
865									async move {
866										let res = completion_receiver.await;
867										(hash, res.ok())
868									}
869									.boxed(),
870								);
871								entry.insert(HashSet::from_iter([who]));
872							},
873							Err(async_channel::TrySendError::Full(_)) => {
874								log::debug!(
875									target: LOG_TARGET,
876									"Dropped statement because validation channel is full",
877								);
878							},
879							Err(async_channel::TrySendError::Closed(_)) => {
880								log::trace!(
881									target: LOG_TARGET,
882									"Dropped statement because validation channel is closed",
883								);
884							},
885						}
886					},
887					Entry::Occupied(mut entry) => {
888						if !entry.get_mut().insert(who) {
889							// Already received this from the same peer.
890							self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
891						}
892					},
893				}
894
895				statements_left -= 1;
896			}
897		}
898	}
899
900	fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
901		match import {
902			SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
903			SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
904			SubmitResult::KnownExpired => {},
905			SubmitResult::Rejected(_) => {},
906			SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
907			SubmitResult::InternalError(_) => {},
908		}
909	}
910
911	/// Propagate one statement.
912	pub async fn propagate_statement(&mut self, hash: &Hash) {
913		// Accept statements only when node is not major syncing
914		if self.sync.is_major_syncing() {
915			return;
916		}
917
918		log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
919		if let Ok(Some(statement)) = self.statement_store.statement(hash) {
920			self.do_propagate_statements(&[(*hash, statement)]).await;
921		}
922	}
923
924	/// Propagate the given `statements` to the given `peer`.
925	///
926	/// Internally filters `statements` to only send unknown statements to the peer.
927	async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
928		let Some(peer) = self.peers.get_mut(who) else {
929			return;
930		};
931
932		let to_send: Vec<_> = statements
933			.iter()
934			.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
935			.collect();
936
937		log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
938
939		if to_send.is_empty() {
940			return;
941		}
942
943		self.send_statements_in_chunks(who, &to_send).await;
944	}
945
946	/// Send statements to a peer in chunks, respecting the maximum notification size.
947	async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
948		let mut offset = 0;
949		while offset < statements.len() {
950			match self.send_statement_chunk(who, &statements[offset..]).await {
951				SendChunkResult::Sent(chunk_end) => {
952					offset += chunk_end;
953				},
954				SendChunkResult::Skipped => {
955					offset += 1;
956				},
957				SendChunkResult::Empty | SendChunkResult::Failed => return,
958			}
959		}
960	}
961
962	async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
963		log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
964		let peers: Vec<_> = self.peers.keys().copied().collect();
965		for who in peers {
966			log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
967			self.send_statements_to_peer(&who, statements).await;
968		}
969		log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
970	}
971
972	/// Call when we must propagate ready statements to peers.
973	async fn propagate_statements(&mut self) {
974		// Send out statements only when node is not major syncing
975		if self.sync.is_major_syncing() {
976			return;
977		}
978
979		let Ok(statements) = self.statement_store.take_recent_statements() else { return };
980		if !statements.is_empty() {
981			self.do_propagate_statements(&statements).await;
982		}
983	}
984
985	/// Record initial sync completion metrics for a peer being removed.
986	fn record_initial_sync_completion(&self, started_at: Instant) {
987		self.metrics.as_ref().map(|metrics| {
988			metrics.initial_sync_peers_active.dec();
989			metrics
990				.initial_sync_duration_seconds
991				.observe(started_at.elapsed().as_secs_f64());
992		});
993	}
994
995	/// Process one batch of initial sync for the next peer in the queue (round-robin).
996	async fn process_initial_sync_burst(&mut self) {
997		if self.sync.is_major_syncing() {
998			return;
999		}
1000
1001		let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1002			return;
1003		};
1004
1005		let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1006			return;
1007		};
1008
1009		self.metrics.as_ref().map(|metrics| {
1010			metrics.initial_sync_bursts_total.inc();
1011		});
1012
1013		if entry.get().hashes.is_empty() {
1014			let started_at = entry.get().started_at;
1015			entry.remove();
1016			self.record_initial_sync_completion(started_at);
1017			return;
1018		}
1019
1020		// Fetch statements up to max_statement_payload_size (reserves space for vec encoding)
1021		let max_size = max_statement_payload_size();
1022		let mut accumulated_size = 0;
1023		let (statements, processed) = match self.statement_store.statements_by_hashes(
1024			&entry.get().hashes,
1025			&mut |_hash, encoded, _stmt| {
1026				if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1027					return FilterDecision::Abort;
1028				}
1029				accumulated_size += encoded.len();
1030				FilterDecision::Take
1031			},
1032		) {
1033			Ok(r) => r,
1034			Err(e) => {
1035				log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1036				let started_at = entry.get().started_at;
1037				entry.remove();
1038				self.record_initial_sync_completion(started_at);
1039				return;
1040			},
1041		};
1042
1043		// Drain processed hashes and check if more remain
1044		entry.get_mut().hashes.drain(..processed);
1045		let has_more = !entry.get().hashes.is_empty();
1046		drop(entry);
1047
1048		// Send statements (already sized to fit in one message)
1049		let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1050		match self.send_statement_chunk(&peer_id, &to_send).await {
1051			SendChunkResult::Failed => {
1052				if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1053					self.record_initial_sync_completion(pending.started_at);
1054				}
1055				return;
1056			},
1057			SendChunkResult::Sent(sent) => {
1058				debug_assert_eq!(to_send.len(), sent);
1059				self.metrics.as_ref().map(|metrics| {
1060					metrics.initial_sync_statements_sent.inc_by(sent as u64);
1061				});
1062				// Mark statements as known
1063				if let Some(peer) = self.peers.get_mut(&peer_id) {
1064					for (hash, _) in &statements {
1065						peer.known_statements.insert(*hash);
1066					}
1067				}
1068			},
1069			SendChunkResult::Empty | SendChunkResult::Skipped => {},
1070		}
1071
1072		// Re-queue if more hashes remain
1073		if has_more {
1074			self.initial_sync_peer_queue.push_back(peer_id);
1075		} else {
1076			if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1077				self.record_initial_sync_completion(pending.started_at);
1078			}
1079		}
1080	}
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085
1086	use super::*;
1087	use std::sync::Mutex;
1088
1089	#[derive(Clone)]
1090	struct TestNetwork {
1091		reported_peers: Arc<Mutex<Vec<(PeerId, soil_network::ReputationChange)>>>,
1092		disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1093	}
1094
1095	impl TestNetwork {
1096		fn new() -> Self {
1097			Self {
1098				reported_peers: Arc::new(Mutex::new(Vec::new())),
1099				disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1100			}
1101		}
1102
1103		fn get_reports(&self) -> Vec<(PeerId, soil_network::ReputationChange)> {
1104			self.reported_peers.lock().unwrap().clone()
1105		}
1106
1107		fn get_disconnected_peers(&self) -> Vec<PeerId> {
1108			self.disconnected_peers.lock().unwrap().clone()
1109		}
1110	}
1111
1112	#[async_trait::async_trait]
1113	impl NetworkPeers for TestNetwork {
1114		fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1115			unimplemented!()
1116		}
1117
1118		fn set_authorized_only(&self, _: bool) {
1119			unimplemented!()
1120		}
1121
1122		fn add_known_address(&self, _: PeerId, _: soil_network::Multiaddr) {
1123			unimplemented!()
1124		}
1125
1126		fn report_peer(&self, peer_id: PeerId, cost_benefit: soil_network::ReputationChange) {
1127			self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1128		}
1129
1130		fn peer_reputation(&self, _: &PeerId) -> i32 {
1131			unimplemented!()
1132		}
1133
1134		fn disconnect_peer(&self, peer: PeerId, _: soil_network::ProtocolName) {
1135			self.disconnected_peers.lock().unwrap().push(peer);
1136		}
1137
1138		fn accept_unreserved_peers(&self) {
1139			unimplemented!()
1140		}
1141
1142		fn deny_unreserved_peers(&self) {
1143			unimplemented!()
1144		}
1145
1146		fn add_reserved_peer(
1147			&self,
1148			_: soil_network::config::MultiaddrWithPeerId,
1149		) -> Result<(), String> {
1150			unimplemented!()
1151		}
1152
1153		fn remove_reserved_peer(&self, _: PeerId) {
1154			unimplemented!()
1155		}
1156
1157		fn set_reserved_peers(
1158			&self,
1159			_: soil_network::ProtocolName,
1160			_: std::collections::HashSet<soil_network::Multiaddr>,
1161		) -> Result<(), String> {
1162			unimplemented!()
1163		}
1164
1165		fn add_peers_to_reserved_set(
1166			&self,
1167			_: soil_network::ProtocolName,
1168			_: std::collections::HashSet<soil_network::Multiaddr>,
1169		) -> Result<(), String> {
1170			unimplemented!()
1171		}
1172
1173		fn remove_peers_from_reserved_set(
1174			&self,
1175			_: soil_network::ProtocolName,
1176			_: Vec<PeerId>,
1177		) -> Result<(), String> {
1178			unimplemented!()
1179		}
1180
1181		fn sync_num_connected(&self) -> usize {
1182			unimplemented!()
1183		}
1184
1185		fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<soil_network::ObservedRole> {
1186			unimplemented!()
1187		}
1188
1189		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1190			unimplemented!();
1191		}
1192	}
1193
1194	struct TestSync {}
1195
1196	impl SyncEventStream for TestSync {
1197		fn event_stream(
1198			&self,
1199			_name: &'static str,
1200		) -> Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>> {
1201			unimplemented!()
1202		}
1203	}
1204
1205	impl soil_client::consensus::SyncOracle for TestSync {
1206		fn is_major_syncing(&self) -> bool {
1207			false
1208		}
1209
1210		fn is_offline(&self) -> bool {
1211			unimplemented!()
1212		}
1213	}
1214
1215	impl NetworkEventStream for TestNetwork {
1216		fn event_stream(
1217			&self,
1218			_name: &'static str,
1219		) -> Pin<Box<dyn Stream<Item = soil_network::Event> + Send>> {
1220			unimplemented!()
1221		}
1222	}
1223
1224	#[derive(Debug, Clone)]
1225	struct TestNotificationService {
1226		sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1227	}
1228
1229	impl TestNotificationService {
1230		fn new() -> Self {
1231			Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1232		}
1233
1234		fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1235			self.sent_notifications.lock().unwrap().clone()
1236		}
1237	}
1238
1239	#[async_trait::async_trait]
1240	impl NotificationService for TestNotificationService {
1241		async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1242			unimplemented!()
1243		}
1244
1245		async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1246			unimplemented!()
1247		}
1248
1249		fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1250			self.sent_notifications.lock().unwrap().push((*peer, notification));
1251		}
1252
1253		async fn send_async_notification(
1254			&mut self,
1255			peer: &PeerId,
1256			notification: Vec<u8>,
1257		) -> Result<(), soil_network::error::Error> {
1258			self.sent_notifications.lock().unwrap().push((*peer, notification));
1259			Ok(())
1260		}
1261
1262		async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1263			unimplemented!()
1264		}
1265
1266		fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1267			unimplemented!()
1268		}
1269
1270		async fn next_event(&mut self) -> Option<soil_network::service::traits::NotificationEvent> {
1271			None
1272		}
1273
1274		fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1275			unimplemented!()
1276		}
1277
1278		fn protocol(&self) -> &soil_network::types::ProtocolName {
1279			unimplemented!()
1280		}
1281
1282		fn message_sink(
1283			&self,
1284			_peer: &PeerId,
1285		) -> Option<Box<dyn soil_network::service::traits::MessageSink>> {
1286			unimplemented!()
1287		}
1288	}
1289
1290	#[derive(Clone)]
1291	struct TestStatementStore {
1292		statements:
1293			Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
1294		recent_statements:
1295			Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
1296	}
1297
1298	impl TestStatementStore {
1299		fn new() -> Self {
1300			Self { statements: Default::default(), recent_statements: Default::default() }
1301		}
1302	}
1303
1304	impl StatementStore for TestStatementStore {
1305		fn statements(
1306			&self,
1307		) -> soil_statement_store::Result<
1308			Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1309		> {
1310			Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1311		}
1312
1313		fn take_recent_statements(
1314			&self,
1315		) -> soil_statement_store::Result<
1316			Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1317		> {
1318			Ok(self.recent_statements.lock().unwrap().drain().collect())
1319		}
1320
1321		fn statement(
1322			&self,
1323			_hash: &soil_statement_store::Hash,
1324		) -> soil_statement_store::Result<Option<soil_statement_store::Statement>> {
1325			unimplemented!()
1326		}
1327
1328		fn has_statement(&self, hash: &soil_statement_store::Hash) -> bool {
1329			self.statements.lock().unwrap().contains_key(hash)
1330		}
1331
1332		fn statement_hashes(&self) -> Vec<soil_statement_store::Hash> {
1333			self.statements.lock().unwrap().keys().cloned().collect()
1334		}
1335
1336		fn statements_by_hashes(
1337			&self,
1338			hashes: &[soil_statement_store::Hash],
1339			filter: &mut dyn FnMut(
1340				&soil_statement_store::Hash,
1341				&[u8],
1342				&soil_statement_store::Statement,
1343			) -> FilterDecision,
1344		) -> soil_statement_store::Result<(
1345			Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1346			usize,
1347		)> {
1348			let statements = self.statements.lock().unwrap();
1349			let mut result = Vec::new();
1350			let mut processed = 0;
1351			for hash in hashes {
1352				let Some(stmt) = statements.get(hash) else {
1353					processed += 1;
1354					continue;
1355				};
1356				let encoded = stmt.encode();
1357				match filter(hash, &encoded, stmt) {
1358					FilterDecision::Skip => {
1359						processed += 1;
1360					},
1361					FilterDecision::Take => {
1362						processed += 1;
1363						result.push((*hash, stmt.clone()));
1364					},
1365					FilterDecision::Abort => break,
1366				}
1367			}
1368			Ok((result, processed))
1369		}
1370
1371		fn broadcasts(
1372			&self,
1373			_match_all_topics: &[soil_statement_store::Topic],
1374		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1375			unimplemented!()
1376		}
1377
1378		fn posted(
1379			&self,
1380			_match_all_topics: &[soil_statement_store::Topic],
1381			_dest: [u8; 32],
1382		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1383			unimplemented!()
1384		}
1385
1386		fn posted_clear(
1387			&self,
1388			_match_all_topics: &[soil_statement_store::Topic],
1389			_dest: [u8; 32],
1390		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1391			unimplemented!()
1392		}
1393
1394		fn broadcasts_stmt(
1395			&self,
1396			_match_all_topics: &[soil_statement_store::Topic],
1397		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1398			unimplemented!()
1399		}
1400
1401		fn posted_stmt(
1402			&self,
1403			_match_all_topics: &[soil_statement_store::Topic],
1404			_dest: [u8; 32],
1405		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1406			unimplemented!()
1407		}
1408
1409		fn posted_clear_stmt(
1410			&self,
1411			_match_all_topics: &[soil_statement_store::Topic],
1412			_dest: [u8; 32],
1413		) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1414			unimplemented!()
1415		}
1416
1417		fn submit(
1418			&self,
1419			_statement: soil_statement_store::Statement,
1420			_source: soil_statement_store::StatementSource,
1421		) -> soil_statement_store::SubmitResult {
1422			unimplemented!()
1423		}
1424
1425		fn remove(&self, _hash: &soil_statement_store::Hash) -> soil_statement_store::Result<()> {
1426			unimplemented!()
1427		}
1428
1429		fn remove_by(&self, _who: [u8; 32]) -> soil_statement_store::Result<()> {
1430			unimplemented!()
1431		}
1432	}
1433
1434	fn build_handler() -> (
1435		StatementHandler<TestNetwork, TestSync>,
1436		TestStatementStore,
1437		TestNetwork,
1438		TestNotificationService,
1439		async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1440	) {
1441		let statement_store = TestStatementStore::new();
1442		let (queue_sender, queue_receiver) = async_channel::bounded(2);
1443		let network = TestNetwork::new();
1444		let notification_service = TestNotificationService::new();
1445		let peer_id = PeerId::random();
1446		let mut peers = HashMap::new();
1447		peers.insert(
1448			peer_id,
1449			Peer {
1450				known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1451				rate_limiter: PeerRateLimiter::new(
1452					NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1453						.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1454					NonZeroU32::new(
1455						DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1456					)
1457					.expect("burst capacity is nonzero"),
1458				),
1459			},
1460		);
1461
1462		let handler = StatementHandler {
1463			protocol_name: "/statement/1".into(),
1464			notification_service: Box::new(notification_service.clone()),
1465			propagate_timeout: (Box::pin(futures::stream::pending())
1466				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1467				.fuse(),
1468			pending_statements: FuturesUnordered::new(),
1469			pending_statements_peers: HashMap::new(),
1470			network: network.clone(),
1471			sync: TestSync {},
1472			sync_event_stream: (Box::pin(futures::stream::pending())
1473				as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
1474				.fuse(),
1475			peers,
1476			statement_store: Arc::new(statement_store.clone()),
1477			queue_sender,
1478			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1479				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1480			metrics: None,
1481			initial_sync_timeout: Box::pin(futures::future::pending()),
1482			pending_initial_syncs: HashMap::new(),
1483			initial_sync_peer_queue: VecDeque::new(),
1484		};
1485		(handler, statement_store, network, notification_service, queue_receiver)
1486	}
1487
1488	#[tokio::test]
1489	async fn test_skips_processing_statements_that_already_in_store() {
1490		let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1491			build_handler();
1492
1493		let mut statement1 = Statement::new();
1494		statement1.set_plain_data(b"statement1".to_vec());
1495		let hash1 = statement1.hash();
1496
1497		statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1498
1499		let mut statement2 = Statement::new();
1500		statement2.set_plain_data(b"statement2".to_vec());
1501		let hash2 = statement2.hash();
1502
1503		let peer_id = *handler.peers.keys().next().unwrap();
1504
1505		handler.on_statements(peer_id, vec![statement1, statement2]);
1506
1507		let to_submit = queue_receiver.try_recv();
1508		assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1509
1510		let no_more = queue_receiver.try_recv();
1511		assert!(no_more.is_err(), "Expected only one statement to be queued");
1512	}
1513
1514	#[tokio::test]
1515	async fn test_reports_for_duplicate_statements() {
1516		let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1517			build_handler();
1518
1519		let peer_id = *handler.peers.keys().next().unwrap();
1520
1521		let mut statement1 = Statement::new();
1522		statement1.set_plain_data(b"statement1".to_vec());
1523
1524		handler.on_statements(peer_id, vec![statement1.clone()]);
1525		{
1526			// Manually process statements submission
1527			let (s, _) = queue_receiver.try_recv().unwrap();
1528			let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1529			handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1530		}
1531
1532		handler.on_statements(peer_id, vec![statement1]);
1533
1534		let reports = network.get_reports();
1535		assert_eq!(
1536			reports,
1537			vec![
1538				(peer_id, rep::ANY_STATEMENT),        // Report for first statement
1539				(peer_id, rep::ANY_STATEMENT_REFUND), // Refund for first statement
1540				(peer_id, rep::DUPLICATE_STATEMENT)   // Report for duplicate statement
1541			],
1542			"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1543			reports
1544		);
1545	}
1546
1547	#[tokio::test]
1548	async fn test_splits_large_batches_into_smaller_chunks() {
1549		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1550			build_handler();
1551
1552		let num_statements = 30;
1553		let statement_size = 100 * 1024; // 100KB per statement
1554		for i in 0..num_statements {
1555			let mut statement = Statement::new();
1556			let mut data = vec![0u8; statement_size];
1557			data[0] = i as u8;
1558			statement.set_plain_data(data);
1559			let hash = statement.hash();
1560			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1561		}
1562
1563		handler.propagate_statements().await;
1564
1565		let sent = notification_service.get_sent_notifications();
1566		let mut total_statements_sent = 0;
1567		assert!(
1568			sent.len() == 3,
1569			"Expected batch to be split into 3 chunks, but got {} chunks",
1570			sent.len()
1571		);
1572		for (_peer, notification) in sent.iter() {
1573			assert!(
1574				notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1575				"Notification size {} exceeds limit {}",
1576				notification.len(),
1577				MAX_STATEMENT_NOTIFICATION_SIZE
1578			);
1579			if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1580				total_statements_sent += stmts.len();
1581			}
1582		}
1583
1584		assert_eq!(
1585			total_statements_sent, num_statements,
1586			"Expected all {} statements to be sent, but only {} were sent",
1587			num_statements, total_statements_sent
1588		);
1589	}
1590
1591	#[tokio::test]
1592	async fn test_skips_only_oversized_statements() {
1593		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1594			build_handler();
1595
1596		let mut statement1 = Statement::new();
1597		statement1.set_plain_data(vec![1u8; 100]);
1598		let hash1 = statement1.hash();
1599		statement_store
1600			.recent_statements
1601			.lock()
1602			.unwrap()
1603			.insert(hash1, statement1.clone());
1604
1605		let mut oversized1 = Statement::new();
1606		oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1607		let hash_oversized1 = oversized1.hash();
1608		statement_store
1609			.recent_statements
1610			.lock()
1611			.unwrap()
1612			.insert(hash_oversized1, oversized1);
1613
1614		let mut statement2 = Statement::new();
1615		statement2.set_plain_data(vec![3u8; 100]);
1616		let hash2 = statement2.hash();
1617		statement_store
1618			.recent_statements
1619			.lock()
1620			.unwrap()
1621			.insert(hash2, statement2.clone());
1622
1623		let mut oversized2 = Statement::new();
1624		oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1625		let hash_oversized2 = oversized2.hash();
1626		statement_store
1627			.recent_statements
1628			.lock()
1629			.unwrap()
1630			.insert(hash_oversized2, oversized2);
1631
1632		let mut statement3 = Statement::new();
1633		statement3.set_plain_data(vec![5u8; 100]);
1634		let hash3 = statement3.hash();
1635		statement_store
1636			.recent_statements
1637			.lock()
1638			.unwrap()
1639			.insert(hash3, statement3.clone());
1640
1641		handler.propagate_statements().await;
1642
1643		let sent = notification_service.get_sent_notifications();
1644
1645		let mut sent_hashes = sent
1646			.iter()
1647			.flat_map(|(_peer, notification)| {
1648				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1649			})
1650			.map(|s| s.hash())
1651			.collect::<Vec<_>>();
1652		sent_hashes.sort();
1653		let mut expected_hashes = vec![hash1, hash2, hash3];
1654		expected_hashes.sort();
1655		assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1656	}
1657
1658	fn build_handler_no_peers() -> (
1659		StatementHandler<TestNetwork, TestSync>,
1660		TestStatementStore,
1661		TestNetwork,
1662		TestNotificationService,
1663	) {
1664		let statement_store = TestStatementStore::new();
1665		let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1666		let network = TestNetwork::new();
1667		let notification_service = TestNotificationService::new();
1668
1669		let handler = StatementHandler {
1670			protocol_name: "/statement/1".into(),
1671			notification_service: Box::new(notification_service.clone()),
1672			propagate_timeout: (Box::pin(futures::stream::pending())
1673				as Pin<Box<dyn Stream<Item = ()> + Send>>)
1674				.fuse(),
1675			pending_statements: FuturesUnordered::new(),
1676			pending_statements_peers: HashMap::new(),
1677			network: network.clone(),
1678			sync: TestSync {},
1679			sync_event_stream: (Box::pin(futures::stream::pending())
1680				as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
1681				.fuse(),
1682			peers: HashMap::new(),
1683			statement_store: Arc::new(statement_store.clone()),
1684			queue_sender,
1685			statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1686				.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1687			metrics: None,
1688			initial_sync_timeout: Box::pin(futures::future::pending()),
1689			pending_initial_syncs: HashMap::new(),
1690			initial_sync_peer_queue: VecDeque::new(),
1691		};
1692		(handler, statement_store, network, notification_service)
1693	}
1694
1695	#[tokio::test]
1696	async fn test_initial_sync_burst_single_peer() {
1697		let (mut handler, statement_store, _network, notification_service) =
1698			build_handler_no_peers();
1699
1700		// Create 20MB of statements (200 statements x 100KB each)
1701		// Using 100KB ensures ~10 statements per 1MB batch, requiring ~20 bursts
1702		let num_statements = 200;
1703		let statement_size = 100 * 1024; // 100KB per statement
1704		let mut expected_hashes = Vec::new();
1705		for i in 0..num_statements {
1706			let mut statement = Statement::new();
1707			let mut data = vec![0u8; statement_size];
1708			// Use multiple bytes for uniqueness since we have >255 statements
1709			data[0] = (i % 256) as u8;
1710			data[1] = (i / 256) as u8;
1711			statement.set_plain_data(data);
1712			let hash = statement.hash();
1713			expected_hashes.push(hash);
1714			statement_store.statements.lock().unwrap().insert(hash, statement);
1715		}
1716
1717		// Setup peer and simulate connection
1718		let peer_id = PeerId::random();
1719
1720		handler
1721			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1722				peer: peer_id,
1723				direction: soil_network::service::traits::Direction::Inbound,
1724				handshake: vec![],
1725				negotiated_fallback: None,
1726			})
1727			.await;
1728
1729		// Verify peer was added and initial sync was queued
1730		assert!(handler.peers.contains_key(&peer_id));
1731		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1732		assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1733
1734		// Process bursts until all statements are sent
1735		let mut burst_count = 0;
1736		while handler.pending_initial_syncs.contains_key(&peer_id) {
1737			handler.process_initial_sync_burst().await;
1738			burst_count += 1;
1739			// Safety limit
1740			assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1741		}
1742
1743		// Verify multiple bursts were needed
1744		// With 200 statements x 100KB each and ~1MB per batch, we expect many bursts
1745		assert!(
1746			burst_count >= 10,
1747			"Expected multiple bursts for 200 statements of 100KB each, got {}",
1748			burst_count
1749		);
1750
1751		// Verify all statements were sent
1752		let sent = notification_service.get_sent_notifications();
1753		let mut sent_hashes: Vec<_> = sent
1754			.iter()
1755			.flat_map(|(peer, notification)| {
1756				assert_eq!(*peer, peer_id);
1757				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1758			})
1759			.map(|s| s.hash())
1760			.collect();
1761		sent_hashes.sort();
1762		expected_hashes.sort();
1763
1764		assert_eq!(
1765			sent_hashes.len(),
1766			expected_hashes.len(),
1767			"Expected {} statements to be sent, got {}",
1768			expected_hashes.len(),
1769			sent_hashes.len()
1770		);
1771		assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1772
1773		// Verify cleanup
1774		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1775		assert!(handler.initial_sync_peer_queue.is_empty());
1776	}
1777
1778	#[tokio::test]
1779	async fn test_initial_sync_burst_multiple_peers_round_robin() {
1780		let (mut handler, statement_store, _network, notification_service) =
1781			build_handler_no_peers();
1782
1783		// Create 20MB of statements (200 statements x 100KB each)
1784		let num_statements = 200;
1785		let statement_size = 100 * 1024; // 100KB per statement
1786		let mut expected_hashes = Vec::new();
1787		for i in 0..num_statements {
1788			let mut statement = Statement::new();
1789			let mut data = vec![0u8; statement_size];
1790			data[0] = (i % 256) as u8;
1791			data[1] = (i / 256) as u8;
1792			statement.set_plain_data(data);
1793			let hash = statement.hash();
1794			expected_hashes.push(hash);
1795			statement_store.statements.lock().unwrap().insert(hash, statement);
1796		}
1797
1798		// Setup 3 peers and simulate connections
1799		let peer1 = PeerId::random();
1800		let peer2 = PeerId::random();
1801		let peer3 = PeerId::random();
1802
1803		// Connect peers
1804		for peer in [peer1, peer2, peer3] {
1805			handler
1806				.handle_notification_event(NotificationEvent::NotificationStreamOpened {
1807					peer,
1808					direction: soil_network::service::traits::Direction::Inbound,
1809					handshake: vec![],
1810					negotiated_fallback: None,
1811				})
1812				.await;
1813		}
1814
1815		// Verify all peers were added and initial syncs were queued
1816		assert_eq!(handler.peers.len(), 3);
1817		assert_eq!(handler.pending_initial_syncs.len(), 3);
1818		assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1819
1820		// Track which peer was processed on each burst for round-robin verification
1821		let mut peer_burst_order = Vec::new();
1822		let mut burst_count = 0;
1823
1824		while !handler.pending_initial_syncs.is_empty() {
1825			// Record which peer will be processed next
1826			if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
1827				peer_burst_order.push(next_peer);
1828			}
1829			handler.process_initial_sync_burst().await;
1830			burst_count += 1;
1831			// Safety limit
1832			assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
1833		}
1834
1835		// Verify multiple bursts were needed
1836		// With 3 peers and many bursts per peer, we expect many bursts total
1837		assert!(
1838			burst_count >= 30,
1839			"Expected many bursts for 3 peers with 200 statements each, got {}",
1840			burst_count
1841		);
1842
1843		// Verify round-robin pattern in first 9 bursts (3 peers x 3 rounds)
1844		assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
1845		// First round
1846		assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
1847		assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
1848		assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
1849		// Second round
1850		assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
1851		assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
1852		assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
1853
1854		// Verify all peers received all statements
1855		let sent = notification_service.get_sent_notifications();
1856		let mut peer1_hashes: Vec<_> = sent
1857			.iter()
1858			.filter(|(peer, _)| *peer == peer1)
1859			.flat_map(|(_, notification)| {
1860				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1861			})
1862			.map(|s| s.hash())
1863			.collect();
1864		let mut peer2_hashes: Vec<_> = sent
1865			.iter()
1866			.filter(|(peer, _)| *peer == peer2)
1867			.flat_map(|(_, notification)| {
1868				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1869			})
1870			.map(|s| s.hash())
1871			.collect();
1872		let mut peer3_hashes: Vec<_> = sent
1873			.iter()
1874			.filter(|(peer, _)| *peer == peer3)
1875			.flat_map(|(_, notification)| {
1876				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1877			})
1878			.map(|s| s.hash())
1879			.collect();
1880
1881		peer1_hashes.sort();
1882		peer2_hashes.sort();
1883		peer3_hashes.sort();
1884		expected_hashes.sort();
1885
1886		assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
1887		assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
1888		assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
1889
1890		// Verify cleanup
1891		assert!(handler.pending_initial_syncs.is_empty());
1892		assert!(handler.initial_sync_peer_queue.is_empty());
1893	}
1894
1895	#[tokio::test]
1896	async fn test_send_statements_in_chunks_exact_max_size() {
1897		let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1898			build_handler();
1899
1900		// Calculate the data sizes so that 100 statements together exactly fill max_size.
1901		// This tests that all 100 statements fit in a single notification.
1902		//
1903		// The limit check in find_sendable_chunk is:
1904		//   max_size = MAX_STATEMENT_NOTIFICATION_SIZE - Compact::<u32>::max_encoded_len()
1905		//
1906		// Statement encoding (encodes as Vec<Field>):
1907		// - Compact<u32> for number of fields (1 byte for value 2: expiry + data)
1908		// - Field::Expiry discriminant (1 byte, value 2)
1909		// - u64 expiry value (8 bytes)
1910		// - Field::Data discriminant (1 byte, value 8)
1911		// - Compact<u32> for the data length (2 bytes for small data)
1912		// So per-statement overhead = 1 + 1 + 8 + 1 + 2 = 13 bytes
1913		let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
1914		let num_statements: usize = 100;
1915		let per_statement_overhead = 1 + 1 + 8 + 1 + 2; // Vec<Field> length + expiry field + data discriminant + Compact data length
1916		let total_overhead = per_statement_overhead * num_statements;
1917		let total_data_size = max_size - total_overhead;
1918		let per_statement_data_size = total_data_size / num_statements;
1919		let remainder = total_data_size % num_statements;
1920
1921		let mut expected_hashes = Vec::with_capacity(num_statements);
1922		let mut total_encoded_size = 0;
1923
1924		for i in 0..num_statements {
1925			let mut statement = Statement::new();
1926			// Distribute remainder across first `remainder` statements to exactly fill max_size
1927			let extra = if i < remainder { 1 } else { 0 };
1928			let mut data = vec![42u8; per_statement_data_size + extra];
1929			// Make each statement unique by modifying the first few bytes
1930			data[0] = i as u8;
1931			data[1] = (i >> 8) as u8;
1932			statement.set_plain_data(data);
1933
1934			total_encoded_size += statement.encoded_size();
1935
1936			let hash = statement.hash();
1937			expected_hashes.push(hash);
1938			statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1939		}
1940
1941		// Verify our calculation: total encoded size should be <= max_size
1942		assert!(
1943			total_encoded_size == max_size,
1944			"Total encoded size {} should be <= max_size {}",
1945			total_encoded_size,
1946			max_size
1947		);
1948
1949		handler.propagate_statements().await;
1950
1951		let sent = notification_service.get_sent_notifications();
1952
1953		// All statements should fit in a single chunk
1954		assert_eq!(
1955			sent.len(),
1956			1,
1957			"Expected 1 notification for all {} statements, but got {}",
1958			num_statements,
1959			sent.len()
1960		);
1961
1962		let (_peer, notification) = &sent[0];
1963		assert!(
1964			notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1965			"Notification size {} exceeds limit {}",
1966			notification.len(),
1967			MAX_STATEMENT_NOTIFICATION_SIZE
1968		);
1969
1970		let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
1971		assert_eq!(
1972			decoded.len(),
1973			num_statements,
1974			"Expected {} statements in the notification",
1975			num_statements
1976		);
1977
1978		// Verify all statements were sent (order may differ due to HashMap iteration)
1979		let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
1980		expected_hashes.sort();
1981		received_hashes.sort();
1982		assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
1983	}
1984
1985	#[tokio::test]
1986	async fn test_initial_sync_burst_size_limit_consistency() {
1987		// This test verifies that process_initial_sync_burst and find_sendable_chunk
1988		// use the same size limit (max_statement_payload_size).
1989		//
1990		// Previously there was a bug where the filter in process_initial_sync_burst used
1991		// MAX_STATEMENT_NOTIFICATION_SIZE, but find_sendable_chunk reserved extra space
1992		// for Compact::<u32>::max_encoded_len(). This caused a debug_assert failure when
1993		// statements fit the filter but not find_sendable_chunk.
1994		//
1995		// With the fix, both use max_statement_payload_size(), so the filter will reject
1996		// statements that wouldn't fit in find_sendable_chunk.
1997		let (mut handler, statement_store, _network, notification_service) =
1998			build_handler_no_peers();
1999
2000		let payload_limit = max_statement_payload_size();
2001
2002		// Create first statement that's just over half the payload limit
2003		let first_stmt_data_size = payload_limit / 2 + 10;
2004		let mut stmt1 = Statement::new();
2005		stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2006		let stmt1_encoded_size = stmt1.encoded_size();
2007
2008		// Create second statement that, combined with the first, exceeds the payload limit.
2009		// This means the filter will only accept the first statement.
2010		let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2011		let target_stmt2_encoded = remaining + 3; // 3 bytes over limit when combined
2012		let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); // ~4 bytes encoding overhead
2013		let mut stmt2 = Statement::new();
2014		stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2015		let stmt2_encoded_size = stmt2.encoded_size();
2016
2017		let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2018
2019		// Verify our setup: total exceeds payload limit
2020		assert!(
2021			total_encoded > payload_limit,
2022			"Total {} should exceed payload_limit {} so filter rejects second statement",
2023			total_encoded,
2024			payload_limit
2025		);
2026
2027		let hash1 = stmt1.hash();
2028		let hash2 = stmt2.hash();
2029		statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2030		statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2031
2032		// Setup peer and simulate connection
2033		let peer_id = PeerId::random();
2034
2035		handler
2036			.handle_notification_event(NotificationEvent::NotificationStreamOpened {
2037				peer: peer_id,
2038				direction: soil_network::service::traits::Direction::Inbound,
2039				handshake: vec![],
2040				negotiated_fallback: None,
2041			})
2042			.await;
2043
2044		// Verify initial sync was queued with both hashes
2045		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2046		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2047
2048		// Process first burst - should send only one statement (the other doesn't fit)
2049		handler.process_initial_sync_burst().await;
2050
2051		// With the fix, the filter and find_sendable_chunk use the same limit,
2052		// so no assertion failure occurs. Only one statement is fetched and sent.
2053		let sent = notification_service.get_sent_notifications();
2054		assert_eq!(sent.len(), 1, "First burst should send one notification");
2055
2056		let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2057		assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2058
2059		// Verify one of the two statements was sent (order is non-deterministic due to HashMap)
2060		let sent_hash = decoded[0].hash();
2061		assert!(
2062			sent_hash == hash1 || sent_hash == hash2,
2063			"Sent statement should be one of the two created"
2064		);
2065
2066		// Second statement should still be pending
2067		assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2068		assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2069
2070		// Process second burst - should send the remaining statement
2071		handler.process_initial_sync_burst().await;
2072
2073		let sent = notification_service.get_sent_notifications();
2074		assert_eq!(sent.len(), 2, "Second burst should send another notification");
2075
2076		// Both statements should now be sent
2077		let mut sent_hashes: Vec<_> = sent
2078			.iter()
2079			.flat_map(|(_, notification)| {
2080				<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2081			})
2082			.map(|s| s.hash())
2083			.collect();
2084		sent_hashes.sort();
2085		let mut expected_hashes = vec![hash1, hash2];
2086		expected_hashes.sort();
2087		assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2088
2089		// No more pending
2090		assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2091	}
2092
2093	#[tokio::test]
2094	async fn test_peer_disconnected_on_flooding() {
2095		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2096			build_handler();
2097
2098		let peer_id = *handler.peers.keys().next().unwrap();
2099
2100		let mut flood_statements = Vec::new();
2101		for i in 0..600_000 {
2102			let mut statement = Statement::new();
2103			statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2104			flood_statements.push(statement);
2105		}
2106
2107		handler.on_statements(peer_id, flood_statements);
2108
2109		let reports = network.get_reports();
2110		assert!(
2111			reports
2112				.iter()
2113				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2114			"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2115			reports
2116		);
2117
2118		let disconnected = network.get_disconnected_peers();
2119		assert!(
2120			disconnected.contains(&peer_id),
2121			"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2122			peer_id,
2123			disconnected
2124		);
2125
2126		// Verify peer state was cleaned up
2127		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2128		assert!(
2129			!handler.pending_initial_syncs.contains_key(&peer_id),
2130			"Peer should be removed from pending_initial_syncs"
2131		);
2132		assert!(
2133			!handler.initial_sync_peer_queue.contains(&peer_id),
2134			"Peer should be removed from initial_sync_peer_queue"
2135		);
2136	}
2137
2138	#[tokio::test]
2139	async fn test_legitimate_traffic_not_flagged() {
2140		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2141			build_handler();
2142
2143		let peer_id = *handler.peers.keys().next().unwrap();
2144
2145		let start = std::time::Instant::now();
2146		let duration = std::time::Duration::from_secs(5);
2147		let mut counter = 0u32;
2148
2149		while start.elapsed() < duration {
2150			let mut statements = Vec::new();
2151			for i in 0..5_000 {
2152				let mut statement = Statement::new();
2153				statement.set_plain_data(vec![
2154					counter as u8,
2155					(counter >> 8) as u8,
2156					(counter >> 16) as u8,
2157					i as u8,
2158				]);
2159				statements.push(statement);
2160				counter = counter.wrapping_add(1);
2161			}
2162
2163			handler.on_statements(peer_id, statements);
2164
2165			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2166		}
2167
2168		let reports = network.get_reports();
2169		assert!(
2170			!reports
2171				.iter()
2172				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2173			"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2174			reports
2175		);
2176
2177		let disconnected = network.get_disconnected_peers();
2178		assert!(
2179			!disconnected.contains(&peer_id),
2180			"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2181			disconnected
2182		);
2183
2184		assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2185	}
2186
2187	#[tokio::test]
2188	async fn test_just_over_rate_limit_triggers_flooding() {
2189		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2190			build_handler();
2191
2192		let peer_id = *handler.peers.keys().next().unwrap();
2193
2194		let mut statements = Vec::new();
2195		for i in 0..260_000 {
2196			let mut statement = Statement::new();
2197			statement.set_plain_data(vec![
2198				i as u8,
2199				(i >> 8) as u8,
2200				(i >> 16) as u8,
2201				(i >> 24) as u8,
2202			]);
2203			statements.push(statement);
2204		}
2205
2206		handler.on_statements(peer_id, statements);
2207
2208		let reports = network.get_reports();
2209		let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2210		assert!(
2211			reports
2212				.iter()
2213				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2214			"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2215			expected_burst,
2216			reports
2217		);
2218
2219		let disconnected = network.get_disconnected_peers();
2220		assert!(
2221			disconnected.contains(&peer_id),
2222			"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2223			disconnected
2224		);
2225
2226		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2227	}
2228
2229	#[tokio::test]
2230	async fn test_burst_of_250k_statements_allowed() {
2231		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2232			build_handler();
2233
2234		let peer_id = *handler.peers.keys().next().unwrap();
2235
2236		let mut statements = Vec::new();
2237		for i in 0..250_000 {
2238			let mut statement = Statement::new();
2239			statement.set_plain_data(vec![
2240				i as u8,
2241				(i >> 8) as u8,
2242				(i >> 16) as u8,
2243				(i >> 24) as u8,
2244			]);
2245			statements.push(statement);
2246		}
2247
2248		handler.on_statements(peer_id, statements);
2249
2250		let reports = network.get_reports();
2251		assert!(
2252			!reports
2253				.iter()
2254				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2255			"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2256			reports
2257		);
2258
2259		assert!(
2260			handler.peers.contains_key(&peer_id),
2261			"Peer should still be connected after 250k burst"
2262		);
2263	}
2264
2265	#[tokio::test]
2266	async fn test_sustained_rate_above_limit_triggers_flooding() {
2267		let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2268			build_handler();
2269
2270		let peer_id = *handler.peers.keys().next().unwrap();
2271
2272		let mut counter = 0u32;
2273
2274		let start = std::time::Instant::now();
2275		let duration = std::time::Duration::from_secs(5);
2276
2277		let mut flooding_detected = false;
2278		while start.elapsed() < duration {
2279			let mut statements = Vec::new();
2280			for i in 0..30_000 {
2281				let mut statement = Statement::new();
2282				statement.set_plain_data(vec![
2283					counter as u8,
2284					(counter >> 8) as u8,
2285					(counter >> 16) as u8,
2286					i as u8,
2287				]);
2288				statements.push(statement);
2289				counter = counter.wrapping_add(1);
2290			}
2291
2292			handler.on_statements(peer_id, statements);
2293
2294			// Check if flooding was detected
2295			let reports = network.get_reports();
2296			if reports
2297				.iter()
2298				.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2299			{
2300				flooding_detected = true;
2301				break;
2302			}
2303
2304			tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2305		}
2306
2307		assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2308
2309		let disconnected = network.get_disconnected_peers();
2310		assert!(
2311			disconnected.contains(&peer_id),
2312			"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2313			disconnected
2314		);
2315
2316		assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2317	}
2318}