1use crate::config::*;
30
31use codec::{Compact, Decode, Encode, MaxEncodedLen};
32#[cfg(any(test, feature = "test-helpers"))]
33use futures::future::pending;
34use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
35use governor::{
36 clock::DefaultClock,
37 state::{InMemoryState, NotKeyed},
38 Quota, RateLimiter,
39};
40use prometheus_endpoint::{
41 exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
42 Registry, U64,
43};
44use sc_network::{
45 config::{NonReservedPeerMode, SetConfig},
46 error, multiaddr,
47 peer_store::PeerStoreProvider,
48 service::{
49 traits::{NotificationEvent, NotificationService, ValidationResult},
50 NotificationMetrics,
51 },
52 types::ProtocolName,
53 utils::{interval, LruHashSet},
54 NetworkBackend, NetworkEventStream, NetworkPeers,
55};
56use sc_network_sync::{SyncEvent, SyncEventStream};
57use sc_network_types::PeerId;
58use sp_runtime::traits::Block as BlockT;
59use sp_statement_store::{
60 FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
61};
62use std::{
63 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
64 iter,
65 num::{NonZeroU32, NonZeroUsize},
66 pin::Pin,
67 sync::Arc,
68 time::Instant,
69};
70use tokio::time::timeout;
71pub mod config;
72
73pub type Statements = Vec<Statement>;
75pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
77
78mod rep {
79 use sc_network::ReputationChange as Rep;
80 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
85 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
87 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
89 pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
91 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
93 pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
95}
96
97const LOG_TARGET: &str = "statement-gossip";
98const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
100const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
102
103struct Metrics {
104 propagated_statements: Counter<U64>,
105 known_statements_received: Counter<U64>,
106 skipped_oversized_statements: Counter<U64>,
107 propagated_statements_chunks: Histogram,
108 pending_statements: Gauge<U64>,
109 ignored_statements: Counter<U64>,
110 peers_connected: Gauge<U64>,
111 statements_received: Counter<U64>,
112 bytes_sent_total: Counter<U64>,
113 bytes_received_total: Counter<U64>,
114 sent_latency_seconds: Histogram,
115 initial_sync_statements_sent: Counter<U64>,
116 initial_sync_bursts_total: Counter<U64>,
117 initial_sync_peers_active: Gauge<U64>,
118 initial_sync_duration_seconds: Histogram,
119 statement_flooding_detected: Counter<U64>,
120}
121
122impl Metrics {
123 fn register(r: &Registry) -> Result<Self, PrometheusError> {
124 Ok(Self {
125 propagated_statements: register(
126 Counter::new(
127 "substrate_sync_propagated_statements",
128 "Number of statements propagated to at least one peer",
129 )?,
130 r,
131 )?,
132 known_statements_received: register(
133 Counter::new(
134 "substrate_sync_known_statement_received",
135 "Number of statements received via gossiping that were already in the statement store",
136 )?,
137 r,
138 )?,
139 skipped_oversized_statements: register(
140 Counter::new(
141 "substrate_sync_skipped_oversized_statements",
142 "Number of oversized statements that were skipped to be gossiped",
143 )?,
144 r,
145 )?,
146 propagated_statements_chunks: register(
147 Histogram::with_opts(
148 HistogramOpts::new(
149 "substrate_sync_propagated_statements_chunks",
150 "Distribution of chunk sizes when propagating statements",
151 )
152 .buckets(exponential_buckets(1.0, 2.0, 14)?),
153 )?,
154 r,
155 )?,
156 pending_statements: register(
157 Gauge::new(
158 "substrate_sync_pending_statement_validations",
159 "Number of pending statement validations",
160 )?,
161 r,
162 )?,
163 ignored_statements: register(
164 Counter::new(
165 "substrate_sync_ignored_statements",
166 "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
167 )?,
168 r,
169 )?,
170 peers_connected: register(
171 Gauge::new(
172 "substrate_sync_statement_peers_connected",
173 "Number of peers connected using the statement protocol",
174 )?,
175 r,
176 )?,
177 statements_received: register(
178 Counter::new(
179 "substrate_sync_statements_received",
180 "Total number of statements received from peers",
181 )?,
182 r,
183 )?,
184 bytes_sent_total: register(
185 Counter::new(
186 "substrate_sync_statement_bytes_sent_total",
187 "Total bytes sent for statement protocol messages",
188 )?,
189 r,
190 )?,
191 bytes_received_total: register(
192 Counter::new(
193 "substrate_sync_statement_bytes_received_total",
194 "Total bytes received for statement protocol messages",
195 )?,
196 r,
197 )?,
198 sent_latency_seconds: register(
199 Histogram::with_opts(
200 HistogramOpts::new(
201 "substrate_sync_statement_sent_latency_seconds",
202 "Time to send statement messages to peers",
203 )
204 .buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
206 )?,
207 r,
208 )?,
209 initial_sync_statements_sent: register(
210 Counter::new(
211 "substrate_sync_initial_sync_statements_sent",
212 "Total statements sent during initial sync bursts to newly connected peers",
213 )?,
214 r,
215 )?,
216 initial_sync_bursts_total: register(
217 Counter::new(
218 "substrate_sync_initial_sync_bursts_total",
219 "Total number of initial sync burst rounds processed",
220 )?,
221 r,
222 )?,
223 initial_sync_peers_active: register(
224 Gauge::new(
225 "substrate_sync_initial_sync_peers_active",
226 "Number of peers currently being synced via initial sync",
227 )?,
228 r,
229 )?,
230 initial_sync_duration_seconds: register(
231 Histogram::with_opts(
232 HistogramOpts::new(
233 "substrate_sync_initial_sync_duration_seconds",
234 "Per-peer total duration of initial sync from start to completion",
235 )
236 .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
237 )?,
238 r,
239 )?,
240 statement_flooding_detected: register(
241 Counter::new(
242 "substrate_sync_statement_flooding_detected",
243 "Number of peers disconnected for exceeding statement rate limits",
244 )?,
245 r,
246 )?,
247 })
248 }
249}
250
251pub struct StatementHandlerPrototype {
253 protocol_name: ProtocolName,
254 notification_service: Box<dyn NotificationService>,
255}
256
257impl StatementHandlerPrototype {
258 pub fn new<
260 Hash: AsRef<[u8]>,
261 Block: BlockT,
262 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
263 >(
264 genesis_hash: Hash,
265 fork_id: Option<&str>,
266 metrics: NotificationMetrics,
267 peer_store_handle: Arc<dyn PeerStoreProvider>,
268 ) -> (Self, Net::NotificationProtocolConfig) {
269 let genesis_hash = genesis_hash.as_ref();
270 let protocol_name = if let Some(fork_id) = fork_id {
271 format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
272 } else {
273 format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
274 };
275 let (config, notification_service) = Net::notification_config(
276 protocol_name.clone().into(),
277 Vec::new(),
278 MAX_STATEMENT_NOTIFICATION_SIZE,
279 None,
280 SetConfig {
281 in_peers: 0,
282 out_peers: 0,
283 reserved_nodes: Vec::new(),
284 non_reserved_mode: NonReservedPeerMode::Deny,
285 },
286 metrics,
287 peer_store_handle,
288 );
289
290 (Self { protocol_name: protocol_name.into(), notification_service }, config)
291 }
292
293 pub fn build<
298 N: NetworkPeers + NetworkEventStream,
299 S: SyncEventStream + sp_consensus::SyncOracle,
300 >(
301 self,
302 network: N,
303 sync: S,
304 statement_store: Arc<dyn StatementStore>,
305 metrics_registry: Option<&Registry>,
306 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
307 mut num_submission_workers: usize,
308 statements_per_second: u32,
309 ) -> error::Result<StatementHandler<N, S>> {
310 let sync_event_stream = sync.event_stream("statement-handler-sync");
311 let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
312
313 if num_submission_workers == 0 {
314 log::warn!(
315 target: LOG_TARGET,
316 "num_submission_workers is 0, defaulting to 1"
317 );
318 num_submission_workers = 1;
319 }
320
321 let statements_per_second = match NonZeroU32::new(statements_per_second) {
322 Some(rate) => rate,
323 None => {
324 log::warn!(
325 target: LOG_TARGET,
326 "statements_per_second is 0, defaulting to {}",
327 DEFAULT_STATEMENTS_PER_SECOND
328 );
329 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
330 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
331 },
332 };
333
334 let metrics =
335 if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
336
337 for _ in 0..num_submission_workers {
338 let store = statement_store.clone();
339 let mut queue_receiver = queue_receiver.clone();
340 executor(
341 async move {
342 loop {
343 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
344 queue_receiver.next().await;
345 match task {
346 None => return,
347 Some((statement, completion)) => {
348 let result = store.submit(statement, StatementSource::Network);
349 if completion.send(result).is_err() {
350 log::debug!(
351 target: LOG_TARGET,
352 "Error sending validation completion"
353 );
354 }
355 },
356 }
357 }
358 }
359 .boxed(),
360 );
361 }
362
363 let handler = StatementHandler {
364 protocol_name: self.protocol_name,
365 notification_service: self.notification_service,
366 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
367 as Pin<Box<dyn Stream<Item = ()> + Send>>)
368 .fuse(),
369 pending_statements: FuturesUnordered::new(),
370 pending_statements_peers: HashMap::new(),
371 network,
372 sync,
373 sync_event_stream: sync_event_stream.fuse(),
374 peers: HashMap::new(),
375 statement_store,
376 queue_sender,
377 statements_per_second,
378 metrics,
379 initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
380 pending_initial_syncs: HashMap::new(),
381 initial_sync_peer_queue: VecDeque::new(),
382 };
383
384 Ok(handler)
385 }
386}
387
388pub struct StatementHandler<
390 N: NetworkPeers + NetworkEventStream,
391 S: SyncEventStream + sp_consensus::SyncOracle,
392> {
393 protocol_name: ProtocolName,
394 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
396 pending_statements:
398 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
399 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
404 network: N,
406 sync: S,
408 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
410 notification_service: Box<dyn NotificationService>,
412 peers: HashMap<PeerId, Peer>,
414 statement_store: Arc<dyn StatementStore>,
415 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
416 statements_per_second: NonZeroU32,
418 metrics: Option<Metrics>,
420 initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
422 pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
424 initial_sync_peer_queue: VecDeque<PeerId>,
426}
427
428#[derive(Debug)]
433struct PeerRateLimiter {
434 limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
435}
436
437impl PeerRateLimiter {
438 fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
439 let quota = Quota::per_second(statements_per_second).allow_burst(burst);
440 Self { limiter: RateLimiter::direct(quota) }
441 }
442
443 fn is_flooding(&self, count: usize) -> bool {
445 if count > u32::MAX as usize {
446 return true;
447 }
448
449 let Some(n) = NonZeroU32::new(count as u32) else {
450 return false;
451 };
452 !matches!(self.limiter.check_n(n), Ok(Ok(())))
453 }
454}
455
456#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
458#[derive(Debug)]
459pub struct Peer {
460 known_statements: LruHashSet<Hash>,
462 rate_limiter: PeerRateLimiter,
464}
465
466struct PendingInitialSync {
468 hashes: Vec<Hash>,
469 started_at: Instant,
470}
471
472enum ChunkResult {
474 Send(usize),
476 SkipOversized,
478}
479
480enum SendChunkResult {
482 Sent(usize),
484 Skipped,
486 Empty,
488 Failed,
490}
491
492fn max_statement_payload_size() -> usize {
497 MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
498}
499
500fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
507 if statements.is_empty() {
508 return ChunkResult::Send(0);
509 }
510 let max_size = max_statement_payload_size();
511
512 let mut accumulated_size = 0;
517 let mut count = 0usize;
518
519 for stmt in &statements[0..] {
520 let stmt_size = stmt.encoded_size();
521 let new_count = count + 1;
522 let new_total = accumulated_size + stmt_size;
524 if new_total > max_size {
525 break;
526 }
527
528 accumulated_size += stmt_size;
529 count = new_count;
530 }
531
532 if count == 0 {
534 ChunkResult::SkipOversized
535 } else {
536 ChunkResult::Send(count)
537 }
538}
539
540impl Peer {
541 #[cfg(any(test, feature = "test-helpers"))]
543 pub fn new_for_testing(
544 known_statements: LruHashSet<Hash>,
545 statements_per_second: NonZeroU32,
546 burst: NonZeroU32,
547 ) -> Self {
548 Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
549 }
550}
551
552impl<N, S> StatementHandler<N, S>
553where
554 N: NetworkPeers + NetworkEventStream,
555 S: SyncEventStream + sp_consensus::SyncOracle,
556{
557 #[cfg(any(test, feature = "test-helpers"))]
559 pub fn new_for_testing(
560 protocol_name: ProtocolName,
561 notification_service: Box<dyn NotificationService>,
562 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
563 network: N,
564 sync: S,
565 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
566 peers: HashMap<PeerId, Peer>,
567 statement_store: Arc<dyn StatementStore>,
568 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
569 statements_per_second: NonZeroU32,
570 ) -> Self {
571 Self {
572 protocol_name,
573 notification_service,
574 propagate_timeout,
575 pending_statements: FuturesUnordered::new(),
576 pending_statements_peers: HashMap::new(),
577 network,
578 sync,
579 sync_event_stream,
580 peers,
581 statement_store,
582 queue_sender,
583 statements_per_second,
584 metrics: None,
585 initial_sync_timeout: Box::pin(pending().fuse()),
586 pending_initial_syncs: HashMap::new(),
587 initial_sync_peer_queue: VecDeque::new(),
588 }
589 }
590
591 #[cfg(any(test, feature = "test-helpers"))]
593 pub fn pending_statements_mut(
594 &mut self,
595 ) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
596 {
597 &mut self.pending_statements
598 }
599
600 pub async fn run(mut self) {
603 loop {
604 futures::select_biased! {
605 _ = self.propagate_timeout.next() => {
606 self.propagate_statements().await;
607 self.metrics.as_ref().map(|metrics| {
608 metrics.pending_statements.set(self.pending_statements.len() as u64);
609 });
610 },
611 (hash, result) = self.pending_statements.select_next_some() => {
612 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
613 if let Some(result) = result {
614 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
615 }
616 } else {
617 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
618 }
619 },
620 sync_event = self.sync_event_stream.next() => {
621 if let Some(sync_event) = sync_event {
622 self.handle_sync_event(sync_event);
623 } else {
624 return;
626 }
627 }
628 event = self.notification_service.next_event().fuse() => {
629 if let Some(event) = event {
630 self.handle_notification_event(event).await
631 } else {
632 return
634 }
635 }
636 _ = &mut self.initial_sync_timeout => {
637 self.process_initial_sync_burst().await;
638 self.initial_sync_timeout =
639 Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
640 },
641 }
642 }
643 }
644
645 async fn send_statement_chunk(
647 &mut self,
648 peer: &PeerId,
649 statements: &[&Statement],
650 ) -> SendChunkResult {
651 match find_sendable_chunk(statements) {
652 ChunkResult::Send(0) => SendChunkResult::Empty,
653 ChunkResult::Send(chunk_end) => {
654 let chunk = &statements[..chunk_end];
655 let encoded = chunk.encode();
656 let bytes_to_send = encoded.len() as u64;
657
658 let sent_latency_timer =
659 self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
660 let send_result = timeout(
661 SEND_TIMEOUT,
662 self.notification_service.send_async_notification(peer, encoded),
663 )
664 .await;
665 drop(sent_latency_timer);
666
667 if let Err(e) = send_result {
668 log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
669 return SendChunkResult::Failed;
670 }
671
672 log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
673 self.metrics.as_ref().map(|metrics| {
674 metrics.propagated_statements.inc_by(chunk.len() as u64);
675 metrics.bytes_sent_total.inc_by(bytes_to_send);
676 metrics.propagated_statements_chunks.observe(chunk.len() as f64);
677 });
678 SendChunkResult::Sent(chunk_end)
679 },
680 ChunkResult::SkipOversized => {
681 log::warn!(target: LOG_TARGET, "Statement too large, skipping");
682 self.metrics.as_ref().map(|metrics| {
683 metrics.skipped_oversized_statements.inc();
684 });
685 SendChunkResult::Skipped
686 },
687 }
688 }
689
690 fn handle_sync_event(&mut self, event: SyncEvent) {
691 match event {
692 SyncEvent::PeerConnected(remote) => {
693 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
694 .collect::<multiaddr::Multiaddr>();
695 let result = self.network.add_peers_to_reserved_set(
696 self.protocol_name.clone(),
697 iter::once(addr).collect(),
698 );
699 if let Err(err) = result {
700 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
701 }
702 },
703 SyncEvent::PeerDisconnected(remote) => {
704 let result = self.network.remove_peers_from_reserved_set(
705 self.protocol_name.clone(),
706 iter::once(remote).collect(),
707 );
708 if let Err(err) = result {
709 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
710 }
711 },
712 }
713 }
714
715 async fn handle_notification_event(&mut self, event: NotificationEvent) {
716 match event {
717 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
718 let result = self
720 .network
721 .peer_role(peer, handshake)
722 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
723 let _ = result_tx.send(result);
724 },
725 NotificationEvent::NotificationStreamOpened { peer, .. } => {
726 let _was_in = self.peers.insert(
727 peer,
728 Peer {
729 known_statements: LruHashSet::new(
730 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
731 ),
732 rate_limiter: PeerRateLimiter::new(
733 self.statements_per_second,
734 NonZeroU32::new(
735 self.statements_per_second.get() *
736 config::STATEMENTS_BURST_COEFFICIENT,
737 )
738 .expect("burst capacity is nonzero"),
739 ),
740 },
741 );
742 debug_assert!(_was_in.is_none());
743
744 self.metrics.as_ref().map(|metrics| {
745 metrics.peers_connected.set(self.peers.len() as u64);
746 });
747
748 if !self.sync.is_major_syncing() {
749 let hashes = self.statement_store.statement_hashes();
750 if !hashes.is_empty() {
751 self.pending_initial_syncs.insert(
752 peer,
753 PendingInitialSync { hashes, started_at: Instant::now() },
754 );
755 self.initial_sync_peer_queue.push_back(peer);
756 self.metrics.as_ref().map(|metrics| {
757 metrics.initial_sync_peers_active.inc();
758 });
759 }
760 }
761 },
762 NotificationEvent::NotificationStreamClosed { peer } => {
763 let _peer = self.peers.remove(&peer);
764 debug_assert!(_peer.is_some());
765 if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
766 self.metrics.as_ref().map(|metrics| {
767 metrics.initial_sync_peers_active.dec();
768 metrics
769 .initial_sync_duration_seconds
770 .observe(pending.started_at.elapsed().as_secs_f64());
771 });
772 }
773 self.initial_sync_peer_queue.retain(|p| *p != peer);
774 self.metrics.as_ref().map(|metrics| {
775 metrics.peers_connected.set(self.peers.len() as u64);
776 });
777 },
778 NotificationEvent::NotificationReceived { peer, notification } => {
779 let bytes_received = notification.len() as u64;
780 self.metrics.as_ref().map(|metrics| {
781 metrics.bytes_received_total.inc_by(bytes_received);
782 });
783
784 if self.sync.is_major_syncing() {
786 log::trace!(
787 target: LOG_TARGET,
788 "{peer}: Ignoring statements while major syncing or offline"
789 );
790 return;
791 }
792
793 if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
794 self.on_statements(peer, statements);
795 } else {
796 log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
797 }
798 },
799 }
800 }
801
802 #[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
804 pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
805 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
806
807 self.metrics.as_ref().map(|metrics| {
808 metrics.statements_received.inc_by(statements.len() as u64);
809 });
810
811 if let Some(ref mut peer) = self.peers.get_mut(&who) {
812 if peer.rate_limiter.is_flooding(statements.len()) {
813 log::warn!(
814 target: LOG_TARGET,
815 "Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
816 who,
817 self.statements_per_second
818 );
819
820 self.network.report_peer(who, rep::STATEMENT_FLOODING);
821 self.network.disconnect_peer(who, self.protocol_name.clone());
822 if let Some(ref metrics) = self.metrics {
823 metrics.statement_flooding_detected.inc();
824 }
825
826 self.peers.remove(&who);
828 self.pending_initial_syncs.remove(&who);
829 self.initial_sync_peer_queue.retain(|p| *p != who);
830
831 return;
832 }
833
834 let mut statements_left = statements.len() as u64;
835 for s in statements {
836 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
837 log::debug!(
838 target: LOG_TARGET,
839 "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
840 statements_left,
841 MAX_PENDING_STATEMENTS,
842 );
843 self.metrics.as_ref().map(|metrics| {
844 metrics.ignored_statements.inc_by(statements_left);
845 });
846 break;
847 }
848
849 let hash = s.hash();
850 peer.known_statements.insert(hash);
851
852 if self.statement_store.has_statement(&hash) {
853 self.metrics.as_ref().map(|metrics| {
854 metrics.known_statements_received.inc();
855 });
856
857 if let Some(peers) = self.pending_statements_peers.get(&hash) {
858 if peers.contains(&who) {
859 log::trace!(
860 target: LOG_TARGET,
861 "Already received the statement from the same peer {who}.",
862 );
863 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
864 }
865 }
866 continue;
867 }
868
869 self.network.report_peer(who, rep::ANY_STATEMENT);
870
871 match self.pending_statements_peers.entry(hash) {
872 Entry::Vacant(entry) => {
873 let (completion_sender, completion_receiver) = oneshot::channel();
874 match self.queue_sender.try_send((s, completion_sender)) {
875 Ok(()) => {
876 self.pending_statements.push(
877 async move {
878 let res = completion_receiver.await;
879 (hash, res.ok())
880 }
881 .boxed(),
882 );
883 entry.insert(HashSet::from_iter([who]));
884 },
885 Err(async_channel::TrySendError::Full(_)) => {
886 log::debug!(
887 target: LOG_TARGET,
888 "Dropped statement because validation channel is full",
889 );
890 },
891 Err(async_channel::TrySendError::Closed(_)) => {
892 log::trace!(
893 target: LOG_TARGET,
894 "Dropped statement because validation channel is closed",
895 );
896 },
897 }
898 },
899 Entry::Occupied(mut entry) => {
900 if !entry.get_mut().insert(who) {
901 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
903 }
904 },
905 }
906
907 statements_left -= 1;
908 }
909 }
910 }
911
912 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
913 match import {
914 SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
915 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
916 SubmitResult::KnownExpired => {},
917 SubmitResult::Rejected(_) => {},
918 SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
919 SubmitResult::InternalError(_) => {},
920 }
921 }
922
923 pub async fn propagate_statement(&mut self, hash: &Hash) {
925 if self.sync.is_major_syncing() {
927 return;
928 }
929
930 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
931 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
932 self.do_propagate_statements(&[(*hash, statement)]).await;
933 }
934 }
935
936 async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
940 let Some(peer) = self.peers.get_mut(who) else {
941 return;
942 };
943
944 let to_send: Vec<_> = statements
945 .iter()
946 .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
947 .collect();
948
949 log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
950
951 if to_send.is_empty() {
952 return;
953 }
954
955 self.send_statements_in_chunks(who, &to_send).await;
956 }
957
958 async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
960 let mut offset = 0;
961 while offset < statements.len() {
962 match self.send_statement_chunk(who, &statements[offset..]).await {
963 SendChunkResult::Sent(chunk_end) => {
964 offset += chunk_end;
965 },
966 SendChunkResult::Skipped => {
967 offset += 1;
968 },
969 SendChunkResult::Empty | SendChunkResult::Failed => return,
970 }
971 }
972 }
973
974 async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
975 log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
976 let peers: Vec<_> = self.peers.keys().copied().collect();
977 for who in peers {
978 log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
979 self.send_statements_to_peer(&who, statements).await;
980 }
981 log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
982 }
983
984 async fn propagate_statements(&mut self) {
986 if self.sync.is_major_syncing() {
988 return;
989 }
990
991 let Ok(statements) = self.statement_store.take_recent_statements() else { return };
992 if !statements.is_empty() {
993 self.do_propagate_statements(&statements).await;
994 }
995 }
996
997 fn record_initial_sync_completion(&self, started_at: Instant) {
999 self.metrics.as_ref().map(|metrics| {
1000 metrics.initial_sync_peers_active.dec();
1001 metrics
1002 .initial_sync_duration_seconds
1003 .observe(started_at.elapsed().as_secs_f64());
1004 });
1005 }
1006
1007 async fn process_initial_sync_burst(&mut self) {
1009 if self.sync.is_major_syncing() {
1010 return;
1011 }
1012
1013 let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1014 return;
1015 };
1016
1017 let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1018 return;
1019 };
1020
1021 self.metrics.as_ref().map(|metrics| {
1022 metrics.initial_sync_bursts_total.inc();
1023 });
1024
1025 if entry.get().hashes.is_empty() {
1026 let started_at = entry.get().started_at;
1027 entry.remove();
1028 self.record_initial_sync_completion(started_at);
1029 return;
1030 }
1031
1032 let max_size = max_statement_payload_size();
1034 let mut accumulated_size = 0;
1035 let (statements, processed) = match self.statement_store.statements_by_hashes(
1036 &entry.get().hashes,
1037 &mut |_hash, encoded, _stmt| {
1038 if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1039 return FilterDecision::Abort;
1040 }
1041 accumulated_size += encoded.len();
1042 FilterDecision::Take
1043 },
1044 ) {
1045 Ok(r) => r,
1046 Err(e) => {
1047 log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1048 let started_at = entry.get().started_at;
1049 entry.remove();
1050 self.record_initial_sync_completion(started_at);
1051 return;
1052 },
1053 };
1054
1055 entry.get_mut().hashes.drain(..processed);
1057 let has_more = !entry.get().hashes.is_empty();
1058 drop(entry);
1059
1060 let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1062 match self.send_statement_chunk(&peer_id, &to_send).await {
1063 SendChunkResult::Failed => {
1064 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1065 self.record_initial_sync_completion(pending.started_at);
1066 }
1067 return;
1068 },
1069 SendChunkResult::Sent(sent) => {
1070 debug_assert_eq!(to_send.len(), sent);
1071 self.metrics.as_ref().map(|metrics| {
1072 metrics.initial_sync_statements_sent.inc_by(sent as u64);
1073 });
1074 if let Some(peer) = self.peers.get_mut(&peer_id) {
1076 for (hash, _) in &statements {
1077 peer.known_statements.insert(*hash);
1078 }
1079 }
1080 },
1081 SendChunkResult::Empty | SendChunkResult::Skipped => {},
1082 }
1083
1084 if has_more {
1086 self.initial_sync_peer_queue.push_back(peer_id);
1087 } else {
1088 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1089 self.record_initial_sync_completion(pending.started_at);
1090 }
1091 }
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097
1098 use super::*;
1099 use std::sync::Mutex;
1100
1101 #[derive(Clone)]
1102 struct TestNetwork {
1103 reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1104 disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1105 }
1106
1107 impl TestNetwork {
1108 fn new() -> Self {
1109 Self {
1110 reported_peers: Arc::new(Mutex::new(Vec::new())),
1111 disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1112 }
1113 }
1114
1115 fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1116 self.reported_peers.lock().unwrap().clone()
1117 }
1118
1119 fn get_disconnected_peers(&self) -> Vec<PeerId> {
1120 self.disconnected_peers.lock().unwrap().clone()
1121 }
1122 }
1123
1124 #[async_trait::async_trait]
1125 impl NetworkPeers for TestNetwork {
1126 fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1127 unimplemented!()
1128 }
1129
1130 fn set_authorized_only(&self, _: bool) {
1131 unimplemented!()
1132 }
1133
1134 fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1135 unimplemented!()
1136 }
1137
1138 fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1139 self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1140 }
1141
1142 fn peer_reputation(&self, _: &PeerId) -> i32 {
1143 unimplemented!()
1144 }
1145
1146 fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1147 self.disconnected_peers.lock().unwrap().push(peer);
1148 }
1149
1150 fn accept_unreserved_peers(&self) {
1151 unimplemented!()
1152 }
1153
1154 fn deny_unreserved_peers(&self) {
1155 unimplemented!()
1156 }
1157
1158 fn add_reserved_peer(
1159 &self,
1160 _: sc_network::config::MultiaddrWithPeerId,
1161 ) -> Result<(), String> {
1162 unimplemented!()
1163 }
1164
1165 fn remove_reserved_peer(&self, _: PeerId) {
1166 unimplemented!()
1167 }
1168
1169 fn set_reserved_peers(
1170 &self,
1171 _: sc_network::ProtocolName,
1172 _: std::collections::HashSet<sc_network::Multiaddr>,
1173 ) -> Result<(), String> {
1174 unimplemented!()
1175 }
1176
1177 fn add_peers_to_reserved_set(
1178 &self,
1179 _: sc_network::ProtocolName,
1180 _: std::collections::HashSet<sc_network::Multiaddr>,
1181 ) -> Result<(), String> {
1182 unimplemented!()
1183 }
1184
1185 fn remove_peers_from_reserved_set(
1186 &self,
1187 _: sc_network::ProtocolName,
1188 _: Vec<PeerId>,
1189 ) -> Result<(), String> {
1190 unimplemented!()
1191 }
1192
1193 fn sync_num_connected(&self) -> usize {
1194 unimplemented!()
1195 }
1196
1197 fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1198 unimplemented!()
1199 }
1200
1201 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1202 unimplemented!();
1203 }
1204 }
1205
1206 struct TestSync {}
1207
1208 impl SyncEventStream for TestSync {
1209 fn event_stream(
1210 &self,
1211 _name: &'static str,
1212 ) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1213 unimplemented!()
1214 }
1215 }
1216
1217 impl sp_consensus::SyncOracle for TestSync {
1218 fn is_major_syncing(&self) -> bool {
1219 false
1220 }
1221
1222 fn is_offline(&self) -> bool {
1223 unimplemented!()
1224 }
1225 }
1226
1227 impl NetworkEventStream for TestNetwork {
1228 fn event_stream(
1229 &self,
1230 _name: &'static str,
1231 ) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1232 unimplemented!()
1233 }
1234 }
1235
1236 #[derive(Debug, Clone)]
1237 struct TestNotificationService {
1238 sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1239 }
1240
1241 impl TestNotificationService {
1242 fn new() -> Self {
1243 Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1244 }
1245
1246 fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1247 self.sent_notifications.lock().unwrap().clone()
1248 }
1249 }
1250
1251 #[async_trait::async_trait]
1252 impl NotificationService for TestNotificationService {
1253 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1254 unimplemented!()
1255 }
1256
1257 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1258 unimplemented!()
1259 }
1260
1261 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1262 self.sent_notifications.lock().unwrap().push((*peer, notification));
1263 }
1264
1265 async fn send_async_notification(
1266 &mut self,
1267 peer: &PeerId,
1268 notification: Vec<u8>,
1269 ) -> Result<(), sc_network::error::Error> {
1270 self.sent_notifications.lock().unwrap().push((*peer, notification));
1271 Ok(())
1272 }
1273
1274 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1275 unimplemented!()
1276 }
1277
1278 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1279 unimplemented!()
1280 }
1281
1282 async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1283 None
1284 }
1285
1286 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1287 unimplemented!()
1288 }
1289
1290 fn protocol(&self) -> &sc_network::types::ProtocolName {
1291 unimplemented!()
1292 }
1293
1294 fn message_sink(
1295 &self,
1296 _peer: &PeerId,
1297 ) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1298 unimplemented!()
1299 }
1300 }
1301
1302 #[derive(Clone)]
1303 struct TestStatementStore {
1304 statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1305 recent_statements:
1306 Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1307 }
1308
1309 impl TestStatementStore {
1310 fn new() -> Self {
1311 Self { statements: Default::default(), recent_statements: Default::default() }
1312 }
1313 }
1314
1315 impl StatementStore for TestStatementStore {
1316 fn statements(
1317 &self,
1318 ) -> sp_statement_store::Result<
1319 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1320 > {
1321 Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1322 }
1323
1324 fn take_recent_statements(
1325 &self,
1326 ) -> sp_statement_store::Result<
1327 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1328 > {
1329 Ok(self.recent_statements.lock().unwrap().drain().collect())
1330 }
1331
1332 fn statement(
1333 &self,
1334 _hash: &sp_statement_store::Hash,
1335 ) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1336 unimplemented!()
1337 }
1338
1339 fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1340 self.statements.lock().unwrap().contains_key(hash)
1341 }
1342
1343 fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1344 self.statements.lock().unwrap().keys().cloned().collect()
1345 }
1346
1347 fn statements_by_hashes(
1348 &self,
1349 hashes: &[sp_statement_store::Hash],
1350 filter: &mut dyn FnMut(
1351 &sp_statement_store::Hash,
1352 &[u8],
1353 &sp_statement_store::Statement,
1354 ) -> FilterDecision,
1355 ) -> sp_statement_store::Result<(
1356 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1357 usize,
1358 )> {
1359 let statements = self.statements.lock().unwrap();
1360 let mut result = Vec::new();
1361 let mut processed = 0;
1362 for hash in hashes {
1363 let Some(stmt) = statements.get(hash) else {
1364 processed += 1;
1365 continue;
1366 };
1367 let encoded = stmt.encode();
1368 match filter(hash, &encoded, stmt) {
1369 FilterDecision::Skip => {
1370 processed += 1;
1371 },
1372 FilterDecision::Take => {
1373 processed += 1;
1374 result.push((*hash, stmt.clone()));
1375 },
1376 FilterDecision::Abort => break,
1377 }
1378 }
1379 Ok((result, processed))
1380 }
1381
1382 fn broadcasts(
1383 &self,
1384 _match_all_topics: &[sp_statement_store::Topic],
1385 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1386 unimplemented!()
1387 }
1388
1389 fn posted(
1390 &self,
1391 _match_all_topics: &[sp_statement_store::Topic],
1392 _dest: [u8; 32],
1393 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1394 unimplemented!()
1395 }
1396
1397 fn posted_clear(
1398 &self,
1399 _match_all_topics: &[sp_statement_store::Topic],
1400 _dest: [u8; 32],
1401 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1402 unimplemented!()
1403 }
1404
1405 fn broadcasts_stmt(
1406 &self,
1407 _match_all_topics: &[sp_statement_store::Topic],
1408 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1409 unimplemented!()
1410 }
1411
1412 fn posted_stmt(
1413 &self,
1414 _match_all_topics: &[sp_statement_store::Topic],
1415 _dest: [u8; 32],
1416 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1417 unimplemented!()
1418 }
1419
1420 fn posted_clear_stmt(
1421 &self,
1422 _match_all_topics: &[sp_statement_store::Topic],
1423 _dest: [u8; 32],
1424 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1425 unimplemented!()
1426 }
1427
1428 fn submit(
1429 &self,
1430 _statement: sp_statement_store::Statement,
1431 _source: sp_statement_store::StatementSource,
1432 ) -> sp_statement_store::SubmitResult {
1433 unimplemented!()
1434 }
1435
1436 fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1437 unimplemented!()
1438 }
1439
1440 fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1441 unimplemented!()
1442 }
1443 }
1444
1445 fn build_handler() -> (
1446 StatementHandler<TestNetwork, TestSync>,
1447 TestStatementStore,
1448 TestNetwork,
1449 TestNotificationService,
1450 async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1451 ) {
1452 let statement_store = TestStatementStore::new();
1453 let (queue_sender, queue_receiver) = async_channel::bounded(2);
1454 let network = TestNetwork::new();
1455 let notification_service = TestNotificationService::new();
1456 let peer_id = PeerId::random();
1457 let mut peers = HashMap::new();
1458 peers.insert(
1459 peer_id,
1460 Peer {
1461 known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1462 rate_limiter: PeerRateLimiter::new(
1463 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1464 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1465 NonZeroU32::new(
1466 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1467 )
1468 .expect("burst capacity is nonzero"),
1469 ),
1470 },
1471 );
1472
1473 let handler = StatementHandler {
1474 protocol_name: "/statement/1".into(),
1475 notification_service: Box::new(notification_service.clone()),
1476 propagate_timeout: (Box::pin(futures::stream::pending())
1477 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1478 .fuse(),
1479 pending_statements: FuturesUnordered::new(),
1480 pending_statements_peers: HashMap::new(),
1481 network: network.clone(),
1482 sync: TestSync {},
1483 sync_event_stream: (Box::pin(futures::stream::pending())
1484 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1485 .fuse(),
1486 peers,
1487 statement_store: Arc::new(statement_store.clone()),
1488 queue_sender,
1489 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1490 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1491 metrics: None,
1492 initial_sync_timeout: Box::pin(futures::future::pending()),
1493 pending_initial_syncs: HashMap::new(),
1494 initial_sync_peer_queue: VecDeque::new(),
1495 };
1496 (handler, statement_store, network, notification_service, queue_receiver)
1497 }
1498
1499 #[tokio::test]
1500 async fn test_skips_processing_statements_that_already_in_store() {
1501 let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1502 build_handler();
1503
1504 let mut statement1 = Statement::new();
1505 statement1.set_plain_data(b"statement1".to_vec());
1506 let hash1 = statement1.hash();
1507
1508 statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1509
1510 let mut statement2 = Statement::new();
1511 statement2.set_plain_data(b"statement2".to_vec());
1512 let hash2 = statement2.hash();
1513
1514 let peer_id = *handler.peers.keys().next().unwrap();
1515
1516 handler.on_statements(peer_id, vec![statement1, statement2]);
1517
1518 let to_submit = queue_receiver.try_recv();
1519 assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1520
1521 let no_more = queue_receiver.try_recv();
1522 assert!(no_more.is_err(), "Expected only one statement to be queued");
1523 }
1524
1525 #[tokio::test]
1526 async fn test_reports_for_duplicate_statements() {
1527 let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1528 build_handler();
1529
1530 let peer_id = *handler.peers.keys().next().unwrap();
1531
1532 let mut statement1 = Statement::new();
1533 statement1.set_plain_data(b"statement1".to_vec());
1534
1535 handler.on_statements(peer_id, vec![statement1.clone()]);
1536 {
1537 let (s, _) = queue_receiver.try_recv().unwrap();
1539 let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1540 handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1541 }
1542
1543 handler.on_statements(peer_id, vec![statement1]);
1544
1545 let reports = network.get_reports();
1546 assert_eq!(
1547 reports,
1548 vec![
1549 (peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
1553 "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1554 reports
1555 );
1556 }
1557
1558 #[tokio::test]
1559 async fn test_splits_large_batches_into_smaller_chunks() {
1560 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1561 build_handler();
1562
1563 let num_statements = 30;
1564 let statement_size = 100 * 1024; for i in 0..num_statements {
1566 let mut statement = Statement::new();
1567 let mut data = vec![0u8; statement_size];
1568 data[0] = i as u8;
1569 statement.set_plain_data(data);
1570 let hash = statement.hash();
1571 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1572 }
1573
1574 handler.propagate_statements().await;
1575
1576 let sent = notification_service.get_sent_notifications();
1577 let mut total_statements_sent = 0;
1578 assert!(
1579 sent.len() == 3,
1580 "Expected batch to be split into 3 chunks, but got {} chunks",
1581 sent.len()
1582 );
1583 for (_peer, notification) in sent.iter() {
1584 assert!(
1585 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1586 "Notification size {} exceeds limit {}",
1587 notification.len(),
1588 MAX_STATEMENT_NOTIFICATION_SIZE
1589 );
1590 if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1591 total_statements_sent += stmts.len();
1592 }
1593 }
1594
1595 assert_eq!(
1596 total_statements_sent, num_statements,
1597 "Expected all {} statements to be sent, but only {} were sent",
1598 num_statements, total_statements_sent
1599 );
1600 }
1601
1602 #[tokio::test]
1603 async fn test_skips_only_oversized_statements() {
1604 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1605 build_handler();
1606
1607 let mut statement1 = Statement::new();
1608 statement1.set_plain_data(vec![1u8; 100]);
1609 let hash1 = statement1.hash();
1610 statement_store
1611 .recent_statements
1612 .lock()
1613 .unwrap()
1614 .insert(hash1, statement1.clone());
1615
1616 let mut oversized1 = Statement::new();
1617 oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1618 let hash_oversized1 = oversized1.hash();
1619 statement_store
1620 .recent_statements
1621 .lock()
1622 .unwrap()
1623 .insert(hash_oversized1, oversized1);
1624
1625 let mut statement2 = Statement::new();
1626 statement2.set_plain_data(vec![3u8; 100]);
1627 let hash2 = statement2.hash();
1628 statement_store
1629 .recent_statements
1630 .lock()
1631 .unwrap()
1632 .insert(hash2, statement2.clone());
1633
1634 let mut oversized2 = Statement::new();
1635 oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1636 let hash_oversized2 = oversized2.hash();
1637 statement_store
1638 .recent_statements
1639 .lock()
1640 .unwrap()
1641 .insert(hash_oversized2, oversized2);
1642
1643 let mut statement3 = Statement::new();
1644 statement3.set_plain_data(vec![5u8; 100]);
1645 let hash3 = statement3.hash();
1646 statement_store
1647 .recent_statements
1648 .lock()
1649 .unwrap()
1650 .insert(hash3, statement3.clone());
1651
1652 handler.propagate_statements().await;
1653
1654 let sent = notification_service.get_sent_notifications();
1655
1656 let mut sent_hashes = sent
1657 .iter()
1658 .flat_map(|(_peer, notification)| {
1659 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1660 })
1661 .map(|s| s.hash())
1662 .collect::<Vec<_>>();
1663 sent_hashes.sort();
1664 let mut expected_hashes = vec![hash1, hash2, hash3];
1665 expected_hashes.sort();
1666 assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1667 }
1668
1669 fn build_handler_no_peers() -> (
1670 StatementHandler<TestNetwork, TestSync>,
1671 TestStatementStore,
1672 TestNetwork,
1673 TestNotificationService,
1674 ) {
1675 let statement_store = TestStatementStore::new();
1676 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1677 let network = TestNetwork::new();
1678 let notification_service = TestNotificationService::new();
1679
1680 let handler = StatementHandler {
1681 protocol_name: "/statement/1".into(),
1682 notification_service: Box::new(notification_service.clone()),
1683 propagate_timeout: (Box::pin(futures::stream::pending())
1684 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1685 .fuse(),
1686 pending_statements: FuturesUnordered::new(),
1687 pending_statements_peers: HashMap::new(),
1688 network: network.clone(),
1689 sync: TestSync {},
1690 sync_event_stream: (Box::pin(futures::stream::pending())
1691 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1692 .fuse(),
1693 peers: HashMap::new(),
1694 statement_store: Arc::new(statement_store.clone()),
1695 queue_sender,
1696 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1697 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1698 metrics: None,
1699 initial_sync_timeout: Box::pin(futures::future::pending()),
1700 pending_initial_syncs: HashMap::new(),
1701 initial_sync_peer_queue: VecDeque::new(),
1702 };
1703 (handler, statement_store, network, notification_service)
1704 }
1705
1706 #[tokio::test]
1707 async fn test_initial_sync_burst_single_peer() {
1708 let (mut handler, statement_store, _network, notification_service) =
1709 build_handler_no_peers();
1710
1711 let num_statements = 200;
1714 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1716 for i in 0..num_statements {
1717 let mut statement = Statement::new();
1718 let mut data = vec![0u8; statement_size];
1719 data[0] = (i % 256) as u8;
1721 data[1] = (i / 256) as u8;
1722 statement.set_plain_data(data);
1723 let hash = statement.hash();
1724 expected_hashes.push(hash);
1725 statement_store.statements.lock().unwrap().insert(hash, statement);
1726 }
1727
1728 let peer_id = PeerId::random();
1730
1731 handler
1732 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1733 peer: peer_id,
1734 direction: sc_network::service::traits::Direction::Inbound,
1735 handshake: vec![],
1736 negotiated_fallback: None,
1737 })
1738 .await;
1739
1740 assert!(handler.peers.contains_key(&peer_id));
1742 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1743 assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1744
1745 let mut burst_count = 0;
1747 while handler.pending_initial_syncs.contains_key(&peer_id) {
1748 handler.process_initial_sync_burst().await;
1749 burst_count += 1;
1750 assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1752 }
1753
1754 assert!(
1757 burst_count >= 10,
1758 "Expected multiple bursts for 200 statements of 100KB each, got {}",
1759 burst_count
1760 );
1761
1762 let sent = notification_service.get_sent_notifications();
1764 let mut sent_hashes: Vec<_> = sent
1765 .iter()
1766 .flat_map(|(peer, notification)| {
1767 assert_eq!(*peer, peer_id);
1768 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1769 })
1770 .map(|s| s.hash())
1771 .collect();
1772 sent_hashes.sort();
1773 expected_hashes.sort();
1774
1775 assert_eq!(
1776 sent_hashes.len(),
1777 expected_hashes.len(),
1778 "Expected {} statements to be sent, got {}",
1779 expected_hashes.len(),
1780 sent_hashes.len()
1781 );
1782 assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1783
1784 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1786 assert!(handler.initial_sync_peer_queue.is_empty());
1787 }
1788
1789 #[tokio::test]
1790 async fn test_initial_sync_burst_multiple_peers_round_robin() {
1791 let (mut handler, statement_store, _network, notification_service) =
1792 build_handler_no_peers();
1793
1794 let num_statements = 200;
1796 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1798 for i in 0..num_statements {
1799 let mut statement = Statement::new();
1800 let mut data = vec![0u8; statement_size];
1801 data[0] = (i % 256) as u8;
1802 data[1] = (i / 256) as u8;
1803 statement.set_plain_data(data);
1804 let hash = statement.hash();
1805 expected_hashes.push(hash);
1806 statement_store.statements.lock().unwrap().insert(hash, statement);
1807 }
1808
1809 let peer1 = PeerId::random();
1811 let peer2 = PeerId::random();
1812 let peer3 = PeerId::random();
1813
1814 for peer in [peer1, peer2, peer3] {
1816 handler
1817 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1818 peer,
1819 direction: sc_network::service::traits::Direction::Inbound,
1820 handshake: vec![],
1821 negotiated_fallback: None,
1822 })
1823 .await;
1824 }
1825
1826 assert_eq!(handler.peers.len(), 3);
1828 assert_eq!(handler.pending_initial_syncs.len(), 3);
1829 assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1830
1831 let mut peer_burst_order = Vec::new();
1833 let mut burst_count = 0;
1834
1835 while !handler.pending_initial_syncs.is_empty() {
1836 if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
1838 peer_burst_order.push(next_peer);
1839 }
1840 handler.process_initial_sync_burst().await;
1841 burst_count += 1;
1842 assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
1844 }
1845
1846 assert!(
1849 burst_count >= 30,
1850 "Expected many bursts for 3 peers with 200 statements each, got {}",
1851 burst_count
1852 );
1853
1854 assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
1856 assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
1858 assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
1859 assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
1860 assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
1862 assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
1863 assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
1864
1865 let sent = notification_service.get_sent_notifications();
1867 let mut peer1_hashes: Vec<_> = sent
1868 .iter()
1869 .filter(|(peer, _)| *peer == peer1)
1870 .flat_map(|(_, notification)| {
1871 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1872 })
1873 .map(|s| s.hash())
1874 .collect();
1875 let mut peer2_hashes: Vec<_> = sent
1876 .iter()
1877 .filter(|(peer, _)| *peer == peer2)
1878 .flat_map(|(_, notification)| {
1879 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1880 })
1881 .map(|s| s.hash())
1882 .collect();
1883 let mut peer3_hashes: Vec<_> = sent
1884 .iter()
1885 .filter(|(peer, _)| *peer == peer3)
1886 .flat_map(|(_, notification)| {
1887 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1888 })
1889 .map(|s| s.hash())
1890 .collect();
1891
1892 peer1_hashes.sort();
1893 peer2_hashes.sort();
1894 peer3_hashes.sort();
1895 expected_hashes.sort();
1896
1897 assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
1898 assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
1899 assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
1900
1901 assert!(handler.pending_initial_syncs.is_empty());
1903 assert!(handler.initial_sync_peer_queue.is_empty());
1904 }
1905
1906 #[tokio::test]
1907 async fn test_send_statements_in_chunks_exact_max_size() {
1908 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1909 build_handler();
1910
1911 let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
1925 let num_statements: usize = 100;
1926 let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
1928 let total_data_size = max_size - total_overhead;
1929 let per_statement_data_size = total_data_size / num_statements;
1930 let remainder = total_data_size % num_statements;
1931
1932 let mut expected_hashes = Vec::with_capacity(num_statements);
1933 let mut total_encoded_size = 0;
1934
1935 for i in 0..num_statements {
1936 let mut statement = Statement::new();
1937 let extra = if i < remainder { 1 } else { 0 };
1939 let mut data = vec![42u8; per_statement_data_size + extra];
1940 data[0] = i as u8;
1942 data[1] = (i >> 8) as u8;
1943 statement.set_plain_data(data);
1944
1945 total_encoded_size += statement.encoded_size();
1946
1947 let hash = statement.hash();
1948 expected_hashes.push(hash);
1949 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1950 }
1951
1952 assert!(
1954 total_encoded_size == max_size,
1955 "Total encoded size {} should be <= max_size {}",
1956 total_encoded_size,
1957 max_size
1958 );
1959
1960 handler.propagate_statements().await;
1961
1962 let sent = notification_service.get_sent_notifications();
1963
1964 assert_eq!(
1966 sent.len(),
1967 1,
1968 "Expected 1 notification for all {} statements, but got {}",
1969 num_statements,
1970 sent.len()
1971 );
1972
1973 let (_peer, notification) = &sent[0];
1974 assert!(
1975 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1976 "Notification size {} exceeds limit {}",
1977 notification.len(),
1978 MAX_STATEMENT_NOTIFICATION_SIZE
1979 );
1980
1981 let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
1982 assert_eq!(
1983 decoded.len(),
1984 num_statements,
1985 "Expected {} statements in the notification",
1986 num_statements
1987 );
1988
1989 let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
1991 expected_hashes.sort();
1992 received_hashes.sort();
1993 assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
1994 }
1995
1996 #[tokio::test]
1997 async fn test_initial_sync_burst_size_limit_consistency() {
1998 let (mut handler, statement_store, _network, notification_service) =
2009 build_handler_no_peers();
2010
2011 let payload_limit = max_statement_payload_size();
2012
2013 let first_stmt_data_size = payload_limit / 2 + 10;
2015 let mut stmt1 = Statement::new();
2016 stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2017 let stmt1_encoded_size = stmt1.encoded_size();
2018
2019 let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2022 let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
2025 stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2026 let stmt2_encoded_size = stmt2.encoded_size();
2027
2028 let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2029
2030 assert!(
2032 total_encoded > payload_limit,
2033 "Total {} should exceed payload_limit {} so filter rejects second statement",
2034 total_encoded,
2035 payload_limit
2036 );
2037
2038 let hash1 = stmt1.hash();
2039 let hash2 = stmt2.hash();
2040 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2041 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2042
2043 let peer_id = PeerId::random();
2045
2046 handler
2047 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2048 peer: peer_id,
2049 direction: sc_network::service::traits::Direction::Inbound,
2050 handshake: vec![],
2051 negotiated_fallback: None,
2052 })
2053 .await;
2054
2055 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2057 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2058
2059 handler.process_initial_sync_burst().await;
2061
2062 let sent = notification_service.get_sent_notifications();
2065 assert_eq!(sent.len(), 1, "First burst should send one notification");
2066
2067 let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2068 assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2069
2070 let sent_hash = decoded[0].hash();
2072 assert!(
2073 sent_hash == hash1 || sent_hash == hash2,
2074 "Sent statement should be one of the two created"
2075 );
2076
2077 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2079 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2080
2081 handler.process_initial_sync_burst().await;
2083
2084 let sent = notification_service.get_sent_notifications();
2085 assert_eq!(sent.len(), 2, "Second burst should send another notification");
2086
2087 let mut sent_hashes: Vec<_> = sent
2089 .iter()
2090 .flat_map(|(_, notification)| {
2091 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2092 })
2093 .map(|s| s.hash())
2094 .collect();
2095 sent_hashes.sort();
2096 let mut expected_hashes = vec![hash1, hash2];
2097 expected_hashes.sort();
2098 assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2099
2100 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2102 }
2103
2104 #[tokio::test]
2105 async fn test_peer_disconnected_on_flooding() {
2106 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2107 build_handler();
2108
2109 let peer_id = *handler.peers.keys().next().unwrap();
2110
2111 let mut flood_statements = Vec::new();
2112 for i in 0..600_000 {
2113 let mut statement = Statement::new();
2114 statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2115 flood_statements.push(statement);
2116 }
2117
2118 handler.on_statements(peer_id, flood_statements);
2119
2120 let reports = network.get_reports();
2121 assert!(
2122 reports
2123 .iter()
2124 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2125 "Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2126 reports
2127 );
2128
2129 let disconnected = network.get_disconnected_peers();
2130 assert!(
2131 disconnected.contains(&peer_id),
2132 "Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2133 peer_id,
2134 disconnected
2135 );
2136
2137 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2139 assert!(
2140 !handler.pending_initial_syncs.contains_key(&peer_id),
2141 "Peer should be removed from pending_initial_syncs"
2142 );
2143 assert!(
2144 !handler.initial_sync_peer_queue.contains(&peer_id),
2145 "Peer should be removed from initial_sync_peer_queue"
2146 );
2147 }
2148
2149 #[tokio::test]
2150 async fn test_legitimate_traffic_not_flagged() {
2151 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2152 build_handler();
2153
2154 let peer_id = *handler.peers.keys().next().unwrap();
2155
2156 let start = std::time::Instant::now();
2157 let duration = std::time::Duration::from_secs(5);
2158 let mut counter = 0u32;
2159
2160 while start.elapsed() < duration {
2161 let mut statements = Vec::new();
2162 for i in 0..5_000 {
2163 let mut statement = Statement::new();
2164 statement.set_plain_data(vec![
2165 counter as u8,
2166 (counter >> 8) as u8,
2167 (counter >> 16) as u8,
2168 i as u8,
2169 ]);
2170 statements.push(statement);
2171 counter = counter.wrapping_add(1);
2172 }
2173
2174 handler.on_statements(peer_id, statements);
2175
2176 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2177 }
2178
2179 let reports = network.get_reports();
2180 assert!(
2181 !reports
2182 .iter()
2183 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2184 "Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2185 reports
2186 );
2187
2188 let disconnected = network.get_disconnected_peers();
2189 assert!(
2190 !disconnected.contains(&peer_id),
2191 "Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2192 disconnected
2193 );
2194
2195 assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2196 }
2197
2198 #[tokio::test]
2199 async fn test_just_over_rate_limit_triggers_flooding() {
2200 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2201 build_handler();
2202
2203 let peer_id = *handler.peers.keys().next().unwrap();
2204
2205 let mut statements = Vec::new();
2206 for i in 0..260_000 {
2207 let mut statement = Statement::new();
2208 statement.set_plain_data(vec![
2209 i as u8,
2210 (i >> 8) as u8,
2211 (i >> 16) as u8,
2212 (i >> 24) as u8,
2213 ]);
2214 statements.push(statement);
2215 }
2216
2217 handler.on_statements(peer_id, statements);
2218
2219 let reports = network.get_reports();
2220 let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2221 assert!(
2222 reports
2223 .iter()
2224 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2225 "Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2226 expected_burst,
2227 reports
2228 );
2229
2230 let disconnected = network.get_disconnected_peers();
2231 assert!(
2232 disconnected.contains(&peer_id),
2233 "Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2234 disconnected
2235 );
2236
2237 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2238 }
2239
2240 #[tokio::test]
2241 async fn test_burst_of_250k_statements_allowed() {
2242 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2243 build_handler();
2244
2245 let peer_id = *handler.peers.keys().next().unwrap();
2246
2247 let mut statements = Vec::new();
2248 for i in 0..250_000 {
2249 let mut statement = Statement::new();
2250 statement.set_plain_data(vec![
2251 i as u8,
2252 (i >> 8) as u8,
2253 (i >> 16) as u8,
2254 (i >> 24) as u8,
2255 ]);
2256 statements.push(statement);
2257 }
2258
2259 handler.on_statements(peer_id, statements);
2260
2261 let reports = network.get_reports();
2262 assert!(
2263 !reports
2264 .iter()
2265 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2266 "250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2267 reports
2268 );
2269
2270 assert!(
2271 handler.peers.contains_key(&peer_id),
2272 "Peer should still be connected after 250k burst"
2273 );
2274 }
2275
2276 #[tokio::test]
2277 async fn test_sustained_rate_above_limit_triggers_flooding() {
2278 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2279 build_handler();
2280
2281 let peer_id = *handler.peers.keys().next().unwrap();
2282
2283 let mut counter = 0u32;
2284
2285 let start = std::time::Instant::now();
2286 let duration = std::time::Duration::from_secs(5);
2287
2288 let mut flooding_detected = false;
2289 while start.elapsed() < duration {
2290 let mut statements = Vec::new();
2291 for i in 0..30_000 {
2292 let mut statement = Statement::new();
2293 statement.set_plain_data(vec![
2294 counter as u8,
2295 (counter >> 8) as u8,
2296 (counter >> 16) as u8,
2297 i as u8,
2298 ]);
2299 statements.push(statement);
2300 counter = counter.wrapping_add(1);
2301 }
2302
2303 handler.on_statements(peer_id, statements);
2304
2305 let reports = network.get_reports();
2307 if reports
2308 .iter()
2309 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2310 {
2311 flooding_detected = true;
2312 break;
2313 }
2314
2315 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2316 }
2317
2318 assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2319
2320 let disconnected = network.get_disconnected_peers();
2321 assert!(
2322 disconnected.contains(&peer_id),
2323 "Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2324 disconnected
2325 );
2326
2327 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2328 }
2329}