1use crate::config::*;
30
31use codec::{Compact, Decode, Encode, MaxEncodedLen};
32use futures::{
33 channel::oneshot,
34 future::{pending, FusedFuture},
35 prelude::*,
36 stream::FuturesUnordered,
37};
38use governor::{
39 clock::DefaultClock,
40 state::{InMemoryState, NotKeyed},
41 Quota, RateLimiter,
42};
43use prometheus_endpoint::{
44 exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
45 Registry, U64,
46};
47use rand::seq::IteratorRandom;
48use sc_network::{
49 config::{NonReservedPeerMode, SetConfig},
50 error, multiaddr,
51 peer_store::PeerStoreProvider,
52 service::{
53 traits::{NotificationEvent, NotificationService, ValidationResult},
54 NotificationMetrics,
55 },
56 types::ProtocolName,
57 utils::{interval, LruHashSet},
58 NetworkBackend, NetworkEventStream, NetworkPeers,
59};
60use sc_network_sync::{SyncEvent, SyncEventStream};
61use sc_network_types::PeerId;
62use sp_runtime::traits::Block as BlockT;
63use sp_statement_store::{
64 FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
65};
66use std::{
67 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
68 iter,
69 num::{NonZeroU32, NonZeroUsize},
70 pin::Pin,
71 sync::Arc,
72 time::Instant,
73};
74use tokio::time::timeout;
75pub mod config;
76
77pub type Statements = Vec<Statement>;
79pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
81
82mod rep {
83 use sc_network::ReputationChange as Rep;
84 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
89 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
91 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
93 pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
95 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
97 pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
99}
100
101const LOG_TARGET: &str = "statement-gossip";
102const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
104const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
106const SYNC_RECOVERY_READD_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
108
109struct Metrics {
110 propagated_statements: Counter<U64>,
111 known_statements_received: Counter<U64>,
112 skipped_oversized_statements: Counter<U64>,
113 propagated_statements_chunks: Histogram,
114 pending_statements: Gauge<U64>,
115 ignored_statements: Counter<U64>,
116 peers_connected: Gauge<U64>,
117 statements_received: Counter<U64>,
118 bytes_sent_total: Counter<U64>,
119 bytes_received_total: Counter<U64>,
120 sent_latency_seconds: Histogram,
121 initial_sync_statements_sent: Counter<U64>,
122 initial_sync_bursts_total: Counter<U64>,
123 initial_sync_peers_active: Gauge<U64>,
124 initial_sync_duration_seconds: Histogram,
125 statement_flooding_detected: Counter<U64>,
126}
127
128impl Metrics {
129 fn register(r: &Registry) -> Result<Self, PrometheusError> {
130 Ok(Self {
131 propagated_statements: register(
132 Counter::new(
133 "substrate_sync_propagated_statements",
134 "Number of statements propagated to at least one peer",
135 )?,
136 r,
137 )?,
138 known_statements_received: register(
139 Counter::new(
140 "substrate_sync_known_statement_received",
141 "Number of statements received via gossiping that were already in the statement store",
142 )?,
143 r,
144 )?,
145 skipped_oversized_statements: register(
146 Counter::new(
147 "substrate_sync_skipped_oversized_statements",
148 "Number of oversized statements that were skipped to be gossiped",
149 )?,
150 r,
151 )?,
152 propagated_statements_chunks: register(
153 Histogram::with_opts(
154 HistogramOpts::new(
155 "substrate_sync_propagated_statements_chunks",
156 "Distribution of chunk sizes when propagating statements",
157 )
158 .buckets(exponential_buckets(1.0, 2.0, 14)?),
159 )?,
160 r,
161 )?,
162 pending_statements: register(
163 Gauge::new(
164 "substrate_sync_pending_statement_validations",
165 "Number of pending statement validations",
166 )?,
167 r,
168 )?,
169 ignored_statements: register(
170 Counter::new(
171 "substrate_sync_ignored_statements",
172 "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
173 )?,
174 r,
175 )?,
176 peers_connected: register(
177 Gauge::new(
178 "substrate_sync_statement_peers_connected",
179 "Number of peers connected using the statement protocol",
180 )?,
181 r,
182 )?,
183 statements_received: register(
184 Counter::new(
185 "substrate_sync_statements_received",
186 "Total number of statements received from peers",
187 )?,
188 r,
189 )?,
190 bytes_sent_total: register(
191 Counter::new(
192 "substrate_sync_statement_bytes_sent_total",
193 "Total bytes sent for statement protocol messages",
194 )?,
195 r,
196 )?,
197 bytes_received_total: register(
198 Counter::new(
199 "substrate_sync_statement_bytes_received_total",
200 "Total bytes received for statement protocol messages",
201 )?,
202 r,
203 )?,
204 sent_latency_seconds: register(
205 Histogram::with_opts(
206 HistogramOpts::new(
207 "substrate_sync_statement_sent_latency_seconds",
208 "Time to send statement messages to peers",
209 )
210 .buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
212 )?,
213 r,
214 )?,
215 initial_sync_statements_sent: register(
216 Counter::new(
217 "substrate_sync_initial_sync_statements_sent",
218 "Total statements sent during initial sync bursts to newly connected peers",
219 )?,
220 r,
221 )?,
222 initial_sync_bursts_total: register(
223 Counter::new(
224 "substrate_sync_initial_sync_bursts_total",
225 "Total number of initial sync burst rounds processed",
226 )?,
227 r,
228 )?,
229 initial_sync_peers_active: register(
230 Gauge::new(
231 "substrate_sync_initial_sync_peers_active",
232 "Number of peers currently being synced via initial sync",
233 )?,
234 r,
235 )?,
236 initial_sync_duration_seconds: register(
237 Histogram::with_opts(
238 HistogramOpts::new(
239 "substrate_sync_initial_sync_duration_seconds",
240 "Per-peer total duration of initial sync from start to completion",
241 )
242 .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
243 )?,
244 r,
245 )?,
246 statement_flooding_detected: register(
247 Counter::new(
248 "substrate_sync_statement_flooding_detected",
249 "Number of peers disconnected for exceeding statement rate limits",
250 )?,
251 r,
252 )?,
253 })
254 }
255}
256
257pub struct StatementHandlerPrototype {
259 protocol_name: ProtocolName,
260 notification_service: Box<dyn NotificationService>,
261}
262
263impl StatementHandlerPrototype {
264 pub fn new<
266 Hash: AsRef<[u8]>,
267 Block: BlockT,
268 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
269 >(
270 genesis_hash: Hash,
271 fork_id: Option<&str>,
272 metrics: NotificationMetrics,
273 peer_store_handle: Arc<dyn PeerStoreProvider>,
274 ) -> (Self, Net::NotificationProtocolConfig) {
275 let genesis_hash = genesis_hash.as_ref();
276 let protocol_name = if let Some(fork_id) = fork_id {
277 format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
278 } else {
279 format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
280 };
281 let (config, notification_service) = Net::notification_config(
282 protocol_name.clone().into(),
283 Vec::new(),
284 MAX_STATEMENT_NOTIFICATION_SIZE,
285 None,
286 SetConfig {
287 in_peers: 0,
288 out_peers: 0,
289 reserved_nodes: Vec::new(),
290 non_reserved_mode: NonReservedPeerMode::Deny,
291 },
292 metrics,
293 peer_store_handle,
294 );
295
296 (Self { protocol_name: protocol_name.into(), notification_service }, config)
297 }
298
299 pub fn build<
304 N: NetworkPeers + NetworkEventStream,
305 S: SyncEventStream + sp_consensus::SyncOracle,
306 >(
307 self,
308 network: N,
309 sync: S,
310 statement_store: Arc<dyn StatementStore>,
311 metrics_registry: Option<&Registry>,
312 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
313 mut num_submission_workers: usize,
314 statements_per_second: u32,
315 ) -> error::Result<StatementHandler<N, S>> {
316 let sync_event_stream = sync.event_stream("statement-handler-sync");
317 let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
318
319 if num_submission_workers == 0 {
320 log::warn!(
321 target: LOG_TARGET,
322 "num_submission_workers is 0, defaulting to 1"
323 );
324 num_submission_workers = 1;
325 }
326
327 let statements_per_second = match NonZeroU32::new(statements_per_second) {
328 Some(rate) => rate,
329 None => {
330 log::warn!(
331 target: LOG_TARGET,
332 "statements_per_second is 0, defaulting to {}",
333 DEFAULT_STATEMENTS_PER_SECOND
334 );
335 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
336 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
337 },
338 };
339
340 let metrics =
341 if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
342
343 for _ in 0..num_submission_workers {
344 let store = statement_store.clone();
345 let mut queue_receiver = queue_receiver.clone();
346 executor(
347 async move {
348 loop {
349 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
350 queue_receiver.next().await;
351 match task {
352 None => return,
353 Some((statement, completion)) => {
354 let result = store.submit(statement, StatementSource::Network);
355 if completion.send(result).is_err() {
356 log::debug!(
357 target: LOG_TARGET,
358 "Error sending validation completion"
359 );
360 }
361 },
362 }
363 }
364 }
365 .boxed(),
366 );
367 }
368
369 let handler = StatementHandler {
370 protocol_name: self.protocol_name,
371 notification_service: self.notification_service,
372 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
373 as Pin<Box<dyn Stream<Item = ()> + Send>>)
374 .fuse(),
375 pending_statements: FuturesUnordered::new(),
376 pending_statements_peers: HashMap::new(),
377 network,
378 sync,
379 sync_event_stream: sync_event_stream.fuse(),
380 peers: HashMap::new(),
381 statement_store,
382 queue_sender,
383 statements_per_second,
384 metrics,
385 initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
386 pending_initial_syncs: HashMap::new(),
387 initial_sync_peer_queue: VecDeque::new(),
388 deferred_peers: HashSet::new(),
389 dropped_statements_during_sync: false,
390 sync_recovery_peer: None,
391 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
392 };
393
394 Ok(handler)
395 }
396}
397
398pub struct StatementHandler<
400 N: NetworkPeers + NetworkEventStream,
401 S: SyncEventStream + sp_consensus::SyncOracle,
402> {
403 protocol_name: ProtocolName,
404 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
406 pending_statements:
408 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
409 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
414 network: N,
416 sync: S,
418 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
420 notification_service: Box<dyn NotificationService>,
422 peers: HashMap<PeerId, Peer>,
424 statement_store: Arc<dyn StatementStore>,
425 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
426 statements_per_second: NonZeroU32,
428 metrics: Option<Metrics>,
430 initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
432 pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
434 initial_sync_peer_queue: VecDeque<PeerId>,
436 deferred_peers: HashSet<PeerId>,
439 dropped_statements_during_sync: bool,
441 sync_recovery_peer: Option<PeerId>,
443 sync_recovery_readd_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
445}
446
447#[derive(Debug)]
452struct PeerRateLimiter {
453 limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
454}
455
456impl PeerRateLimiter {
457 fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
458 let quota = Quota::per_second(statements_per_second).allow_burst(burst);
459 Self { limiter: RateLimiter::direct(quota) }
460 }
461
462 fn is_flooding(&self, count: usize) -> bool {
464 if count > u32::MAX as usize {
465 return true;
466 }
467
468 let Some(n) = NonZeroU32::new(count as u32) else {
469 return false;
470 };
471 !matches!(self.limiter.check_n(n), Ok(Ok(())))
472 }
473}
474
475#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
477#[derive(Debug)]
478pub struct Peer {
479 known_statements: LruHashSet<Hash>,
481 rate_limiter: PeerRateLimiter,
483}
484
485struct PendingInitialSync {
487 hashes: Vec<Hash>,
488 started_at: Instant,
489}
490
491enum ChunkResult {
493 Send(usize),
495 SkipOversized,
497}
498
499enum SendChunkResult {
501 Sent(usize),
503 Skipped,
505 Empty,
507 Failed,
509}
510
511fn max_statement_payload_size() -> usize {
516 MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
517}
518
519fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
526 if statements.is_empty() {
527 return ChunkResult::Send(0);
528 }
529 let max_size = max_statement_payload_size();
530
531 let mut accumulated_size = 0;
536 let mut count = 0usize;
537
538 for stmt in &statements[0..] {
539 let stmt_size = stmt.encoded_size();
540 let new_count = count + 1;
541 let new_total = accumulated_size + stmt_size;
543 if new_total > max_size {
544 break;
545 }
546
547 accumulated_size += stmt_size;
548 count = new_count;
549 }
550
551 if count == 0 {
553 ChunkResult::SkipOversized
554 } else {
555 ChunkResult::Send(count)
556 }
557}
558
559impl Peer {
560 #[cfg(any(test, feature = "test-helpers"))]
562 pub fn new_for_testing(
563 known_statements: LruHashSet<Hash>,
564 statements_per_second: NonZeroU32,
565 burst: NonZeroU32,
566 ) -> Self {
567 Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
568 }
569}
570
571impl<N, S> StatementHandler<N, S>
572where
573 N: NetworkPeers + NetworkEventStream,
574 S: SyncEventStream + sp_consensus::SyncOracle,
575{
576 #[cfg(any(test, feature = "test-helpers"))]
578 pub fn new_for_testing(
579 protocol_name: ProtocolName,
580 notification_service: Box<dyn NotificationService>,
581 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
582 network: N,
583 sync: S,
584 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
585 peers: HashMap<PeerId, Peer>,
586 statement_store: Arc<dyn StatementStore>,
587 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
588 statements_per_second: NonZeroU32,
589 ) -> Self {
590 Self {
591 protocol_name,
592 notification_service,
593 propagate_timeout,
594 pending_statements: FuturesUnordered::new(),
595 pending_statements_peers: HashMap::new(),
596 network,
597 sync,
598 sync_event_stream,
599 peers,
600 statement_store,
601 queue_sender,
602 statements_per_second,
603 metrics: None,
604 initial_sync_timeout: Box::pin(pending().fuse()),
605 pending_initial_syncs: HashMap::new(),
606 initial_sync_peer_queue: VecDeque::new(),
607 deferred_peers: HashSet::new(),
608 dropped_statements_during_sync: false,
609 sync_recovery_peer: None,
610 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
611 }
612 }
613
614 #[cfg(any(test, feature = "test-helpers"))]
616 pub fn pending_statements_mut(
617 &mut self,
618 ) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
619 {
620 &mut self.pending_statements
621 }
622
623 pub async fn run(mut self) {
626 loop {
627 futures::select_biased! {
628 _ = self.propagate_timeout.next() => {
629 self.propagate_statements().await;
630 self.metrics.as_ref().map(|metrics| {
631 metrics.pending_statements.set(self.pending_statements.len() as u64);
632 });
633 },
634 (hash, result) = self.pending_statements.select_next_some() => {
635 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
636 if let Some(result) = result {
637 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
638 }
639 } else {
640 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
641 }
642 },
643 sync_event = self.sync_event_stream.next() => {
644 if let Some(sync_event) = sync_event {
645 self.handle_sync_event(sync_event);
646 } else {
647 return;
649 }
650 }
651 event = self.notification_service.next_event().fuse() => {
652 if let Some(event) = event {
653 self.handle_notification_event(event).await
654 } else {
655 return
657 }
658 }
659 _ = &mut self.initial_sync_timeout => {
660 self.process_initial_sync_burst().await;
661 self.initial_sync_timeout =
662 Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
663 },
664 _ = &mut self.sync_recovery_readd_timeout => {
665 self.try_readd_sync_recovery_peer();
666 self.sync_recovery_readd_timeout = Box::pin(pending().fuse());
667 },
668 }
669
670 if !self.sync.is_major_syncing() {
671 self.drain_deferred_peers();
672 self.start_sync_recovery();
673 }
674 }
675 }
676
677 async fn send_statement_chunk(
679 &mut self,
680 peer: &PeerId,
681 statements: &[&Statement],
682 ) -> SendChunkResult {
683 match find_sendable_chunk(statements) {
684 ChunkResult::Send(0) => SendChunkResult::Empty,
685 ChunkResult::Send(chunk_end) => {
686 let chunk = &statements[..chunk_end];
687 let encoded = chunk.encode();
688 let bytes_to_send = encoded.len() as u64;
689
690 let sent_latency_timer =
691 self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
692 let send_result = timeout(
693 SEND_TIMEOUT,
694 self.notification_service.send_async_notification(peer, encoded),
695 )
696 .await;
697 drop(sent_latency_timer);
698
699 if let Err(e) = send_result {
700 log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
701 return SendChunkResult::Failed;
702 }
703
704 log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
705 self.metrics.as_ref().map(|metrics| {
706 metrics.propagated_statements.inc_by(chunk.len() as u64);
707 metrics.bytes_sent_total.inc_by(bytes_to_send);
708 metrics.propagated_statements_chunks.observe(chunk.len() as f64);
709 });
710 SendChunkResult::Sent(chunk_end)
711 },
712 ChunkResult::SkipOversized => {
713 log::warn!(target: LOG_TARGET, "Statement too large, skipping");
714 self.metrics.as_ref().map(|metrics| {
715 metrics.skipped_oversized_statements.inc();
716 });
717 SendChunkResult::Skipped
718 },
719 }
720 }
721
722 fn drain_deferred_peers(&mut self) {
724 if self.deferred_peers.is_empty() {
725 return;
726 }
727
728 log::debug!(
729 target: LOG_TARGET,
730 "Major sync complete, adding {} deferred statement peers",
731 self.deferred_peers.len(),
732 );
733
734 let addrs: HashSet<multiaddr::Multiaddr> = self
735 .deferred_peers
736 .drain()
737 .map(|p| {
738 iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
739 })
740 .collect();
741
742 if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
743 {
744 log::warn!(target: LOG_TARGET, "Failed to add deferred peers: {err}");
745 }
746 }
747
748 fn start_sync_recovery(&mut self) {
753 if !self.dropped_statements_during_sync {
754 return;
755 }
756 self.dropped_statements_during_sync = false;
757
758 if self.sync_recovery_peer.is_some() {
759 return;
760 }
761
762 let Some(&peer_id) = self.peers.keys().choose(&mut rand::thread_rng()) else {
763 return;
764 };
765
766 log::trace!(
767 target: LOG_TARGET,
768 "Major sync complete, force-reconnecting {peer_id} for statement recovery",
769 );
770
771 if let Err(err) = self.network.remove_peers_from_reserved_set(
772 self.protocol_name.clone(),
773 iter::once(peer_id).collect(),
774 ) {
775 log::warn!(target: LOG_TARGET, "Failed to remove peer {peer_id} for sync recovery: {err}");
776 return;
777 }
778
779 self.sync_recovery_peer = Some(peer_id);
780 self.sync_recovery_readd_timeout =
781 Box::pin(tokio::time::sleep(SYNC_RECOVERY_READD_DELAY).fuse());
782 }
783
784 fn try_readd_sync_recovery_peer(&mut self) {
786 let Some(peer_id) = self.sync_recovery_peer.take() else { return };
787 log::trace!(
788 target: LOG_TARGET,
789 "Re-adding {peer_id} to reserved set after sync recovery window",
790 );
791 let addr =
792 iter::once(multiaddr::Protocol::P2p(peer_id.into())).collect::<multiaddr::Multiaddr>();
793 if let Err(err) = self
794 .network
795 .add_peers_to_reserved_set(self.protocol_name.clone(), iter::once(addr).collect())
796 {
797 log::warn!(target: LOG_TARGET, "Failed to re-add sync recovery peer {peer_id}: {err}");
798 }
799 }
800
801 fn handle_sync_event(&mut self, event: SyncEvent) {
802 match event {
803 SyncEvent::PeerConnected(remote) => {
804 if self.sync.is_major_syncing() {
805 log::trace!(
806 target: LOG_TARGET,
807 "Major sync in progress, deferring connection to {remote}",
808 );
809 self.deferred_peers.insert(remote);
810 return;
811 }
812 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
813 .collect::<multiaddr::Multiaddr>();
814 let result = self.network.add_peers_to_reserved_set(
815 self.protocol_name.clone(),
816 iter::once(addr).collect(),
817 );
818 if let Err(err) = result {
819 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
820 }
821 },
822 SyncEvent::PeerDisconnected(remote) => {
823 if self.deferred_peers.remove(&remote) {
824 return;
825 }
826 let result = self.network.remove_peers_from_reserved_set(
827 self.protocol_name.clone(),
828 iter::once(remote).collect(),
829 );
830 if let Err(err) = result {
831 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
832 }
833 },
834 }
835 }
836
837 async fn handle_notification_event(&mut self, event: NotificationEvent) {
838 match event {
839 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
840 let result = self
842 .network
843 .peer_role(peer, handshake)
844 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
845 let _ = result_tx.send(result);
846 },
847 NotificationEvent::NotificationStreamOpened { peer, .. } => {
848 let _was_in = self.peers.insert(
849 peer,
850 Peer {
851 known_statements: LruHashSet::new(
852 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
853 ),
854 rate_limiter: PeerRateLimiter::new(
855 self.statements_per_second,
856 NonZeroU32::new(
857 self.statements_per_second.get() *
858 config::STATEMENTS_BURST_COEFFICIENT,
859 )
860 .expect("burst capacity is nonzero"),
861 ),
862 },
863 );
864 debug_assert!(_was_in.is_none());
865
866 self.metrics.as_ref().map(|metrics| {
867 metrics.peers_connected.set(self.peers.len() as u64);
868 });
869
870 if !self.sync.is_major_syncing() {
871 let hashes = self.statement_store.statement_hashes();
872 if !hashes.is_empty() {
873 self.pending_initial_syncs.insert(
874 peer,
875 PendingInitialSync { hashes, started_at: Instant::now() },
876 );
877 self.initial_sync_peer_queue.push_back(peer);
878 self.metrics.as_ref().map(|metrics| {
879 metrics.initial_sync_peers_active.inc();
880 });
881 }
882 }
883 },
884 NotificationEvent::NotificationStreamClosed { peer } => {
885 let _peer = self.peers.remove(&peer);
886 debug_assert!(_peer.is_some());
887 if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
888 self.metrics.as_ref().map(|metrics| {
889 metrics.initial_sync_peers_active.dec();
890 metrics
891 .initial_sync_duration_seconds
892 .observe(pending.started_at.elapsed().as_secs_f64());
893 });
894 }
895 self.initial_sync_peer_queue.retain(|p| *p != peer);
896 self.metrics.as_ref().map(|metrics| {
897 metrics.peers_connected.set(self.peers.len() as u64);
898 });
899 },
900 NotificationEvent::NotificationReceived { peer, notification } => {
901 let bytes_received = notification.len() as u64;
902 self.metrics.as_ref().map(|metrics| {
903 metrics.bytes_received_total.inc_by(bytes_received);
904 });
905
906 if self.sync.is_major_syncing() {
908 log::trace!(
909 target: LOG_TARGET,
910 "{peer}: Ignoring statements while major syncing or offline"
911 );
912 self.dropped_statements_during_sync = true;
913 return;
914 }
915
916 if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
917 self.on_statements(peer, statements);
918 } else {
919 log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
920 }
921 },
922 }
923 }
924
925 #[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
927 pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
928 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
929
930 self.metrics.as_ref().map(|metrics| {
931 metrics.statements_received.inc_by(statements.len() as u64);
932 });
933
934 if let Some(ref mut peer) = self.peers.get_mut(&who) {
935 if peer.rate_limiter.is_flooding(statements.len()) {
936 log::warn!(
937 target: LOG_TARGET,
938 "Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
939 who,
940 self.statements_per_second
941 );
942
943 self.network.report_peer(who, rep::STATEMENT_FLOODING);
944 self.network.disconnect_peer(who, self.protocol_name.clone());
945 if let Some(ref metrics) = self.metrics {
946 metrics.statement_flooding_detected.inc();
947 }
948
949 self.peers.remove(&who);
951 self.pending_initial_syncs.remove(&who);
952 self.initial_sync_peer_queue.retain(|p| *p != who);
953
954 return;
955 }
956
957 let mut statements_left = statements.len() as u64;
958 for s in statements {
959 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
960 log::debug!(
961 target: LOG_TARGET,
962 "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
963 statements_left,
964 MAX_PENDING_STATEMENTS,
965 );
966 self.metrics.as_ref().map(|metrics| {
967 metrics.ignored_statements.inc_by(statements_left);
968 });
969 break;
970 }
971
972 let hash = s.hash();
973 peer.known_statements.insert(hash);
974
975 if self.statement_store.has_statement(&hash) {
976 self.metrics.as_ref().map(|metrics| {
977 metrics.known_statements_received.inc();
978 });
979
980 if let Some(peers) = self.pending_statements_peers.get(&hash) {
981 if peers.contains(&who) {
982 log::trace!(
983 target: LOG_TARGET,
984 "Already received the statement from the same peer {who}.",
985 );
986 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
987 }
988 }
989 continue;
990 }
991
992 self.network.report_peer(who, rep::ANY_STATEMENT);
993
994 match self.pending_statements_peers.entry(hash) {
995 Entry::Vacant(entry) => {
996 let (completion_sender, completion_receiver) = oneshot::channel();
997 match self.queue_sender.try_send((s, completion_sender)) {
998 Ok(()) => {
999 self.pending_statements.push(
1000 async move {
1001 let res = completion_receiver.await;
1002 (hash, res.ok())
1003 }
1004 .boxed(),
1005 );
1006 entry.insert(HashSet::from_iter([who]));
1007 },
1008 Err(async_channel::TrySendError::Full(_)) => {
1009 log::debug!(
1010 target: LOG_TARGET,
1011 "Dropped statement because validation channel is full",
1012 );
1013 },
1014 Err(async_channel::TrySendError::Closed(_)) => {
1015 log::trace!(
1016 target: LOG_TARGET,
1017 "Dropped statement because validation channel is closed",
1018 );
1019 },
1020 }
1021 },
1022 Entry::Occupied(mut entry) => {
1023 if !entry.get_mut().insert(who) {
1024 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1026 }
1027 },
1028 }
1029
1030 statements_left -= 1;
1031 }
1032 }
1033 }
1034
1035 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
1036 match import {
1037 SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
1038 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
1039 SubmitResult::KnownExpired => {},
1040 SubmitResult::Rejected(_) => {},
1041 SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
1042 SubmitResult::InternalError(_) => {},
1043 }
1044 }
1045
1046 pub async fn propagate_statement(&mut self, hash: &Hash) {
1048 if self.sync.is_major_syncing() {
1050 return;
1051 }
1052
1053 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
1054 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
1055 self.do_propagate_statements(&[(*hash, statement)]).await;
1056 }
1057 }
1058
1059 async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
1063 let Some(peer) = self.peers.get_mut(who) else {
1064 return;
1065 };
1066
1067 let to_send: Vec<_> = statements
1068 .iter()
1069 .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
1070 .collect();
1071
1072 log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
1073
1074 if to_send.is_empty() {
1075 return;
1076 }
1077
1078 self.send_statements_in_chunks(who, &to_send).await;
1079 }
1080
1081 async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
1083 let mut offset = 0;
1084 while offset < statements.len() {
1085 match self.send_statement_chunk(who, &statements[offset..]).await {
1086 SendChunkResult::Sent(chunk_end) => {
1087 offset += chunk_end;
1088 },
1089 SendChunkResult::Skipped => {
1090 offset += 1;
1091 },
1092 SendChunkResult::Empty | SendChunkResult::Failed => return,
1093 }
1094 }
1095 }
1096
1097 async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
1098 log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
1099 let peers: Vec<_> = self.peers.keys().copied().collect();
1100 for who in peers {
1101 log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
1102 self.send_statements_to_peer(&who, statements).await;
1103 }
1104 log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
1105 }
1106
1107 async fn propagate_statements(&mut self) {
1109 if self.sync.is_major_syncing() {
1111 return;
1112 }
1113
1114 let Ok(statements) = self.statement_store.take_recent_statements() else { return };
1115 if !statements.is_empty() {
1116 self.do_propagate_statements(&statements).await;
1117 }
1118 }
1119
1120 fn record_initial_sync_completion(&self, started_at: Instant) {
1122 self.metrics.as_ref().map(|metrics| {
1123 metrics.initial_sync_peers_active.dec();
1124 metrics
1125 .initial_sync_duration_seconds
1126 .observe(started_at.elapsed().as_secs_f64());
1127 });
1128 }
1129
1130 async fn process_initial_sync_burst(&mut self) {
1132 if self.sync.is_major_syncing() {
1133 return;
1134 }
1135
1136 let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1137 return;
1138 };
1139
1140 let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1141 return;
1142 };
1143
1144 self.metrics.as_ref().map(|metrics| {
1145 metrics.initial_sync_bursts_total.inc();
1146 });
1147
1148 if entry.get().hashes.is_empty() {
1149 let started_at = entry.get().started_at;
1150 entry.remove();
1151 self.record_initial_sync_completion(started_at);
1152 return;
1153 }
1154
1155 let max_size = max_statement_payload_size();
1157 let mut accumulated_size = 0;
1158 let (statements, processed) = match self.statement_store.statements_by_hashes(
1159 &entry.get().hashes,
1160 &mut |_hash, encoded, _stmt| {
1161 if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1162 return FilterDecision::Abort;
1163 }
1164 accumulated_size += encoded.len();
1165 FilterDecision::Take
1166 },
1167 ) {
1168 Ok(r) => r,
1169 Err(e) => {
1170 log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1171 let started_at = entry.get().started_at;
1172 entry.remove();
1173 self.record_initial_sync_completion(started_at);
1174 return;
1175 },
1176 };
1177
1178 entry.get_mut().hashes.drain(..processed);
1180 let has_more = !entry.get().hashes.is_empty();
1181 drop(entry);
1182
1183 let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1185 match self.send_statement_chunk(&peer_id, &to_send).await {
1186 SendChunkResult::Failed => {
1187 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1188 self.record_initial_sync_completion(pending.started_at);
1189 }
1190 return;
1191 },
1192 SendChunkResult::Sent(sent) => {
1193 debug_assert_eq!(to_send.len(), sent);
1194 self.metrics.as_ref().map(|metrics| {
1195 metrics.initial_sync_statements_sent.inc_by(sent as u64);
1196 });
1197 if let Some(peer) = self.peers.get_mut(&peer_id) {
1199 for (hash, _) in &statements {
1200 peer.known_statements.insert(*hash);
1201 }
1202 }
1203 },
1204 SendChunkResult::Empty | SendChunkResult::Skipped => {},
1205 }
1206
1207 if has_more {
1209 self.initial_sync_peer_queue.push_back(peer_id);
1210 } else {
1211 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1212 self.record_initial_sync_completion(pending.started_at);
1213 }
1214 }
1215 }
1216}
1217
1218#[cfg(test)]
1219mod tests {
1220
1221 use super::*;
1222 use std::sync::{
1223 atomic::{AtomicBool, Ordering},
1224 Mutex,
1225 };
1226
1227 #[derive(Clone)]
1228 struct TestNetwork {
1229 reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1230 disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1231 added_reserved: Arc<Mutex<Vec<HashSet<sc_network::Multiaddr>>>>,
1232 removed_reserved: Arc<Mutex<Vec<Vec<PeerId>>>>,
1233 }
1234
1235 impl TestNetwork {
1236 fn new() -> Self {
1237 Self {
1238 reported_peers: Arc::new(Mutex::new(Vec::new())),
1239 disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1240 added_reserved: Arc::new(Mutex::new(Vec::new())),
1241 removed_reserved: Arc::new(Mutex::new(Vec::new())),
1242 }
1243 }
1244
1245 fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1246 self.reported_peers.lock().unwrap().clone()
1247 }
1248
1249 fn get_disconnected_peers(&self) -> Vec<PeerId> {
1250 self.disconnected_peers.lock().unwrap().clone()
1251 }
1252
1253 fn get_added_reserved(&self) -> Vec<HashSet<sc_network::Multiaddr>> {
1254 self.added_reserved.lock().unwrap().clone()
1255 }
1256
1257 fn get_removed_reserved(&self) -> Vec<Vec<PeerId>> {
1258 self.removed_reserved.lock().unwrap().clone()
1259 }
1260 }
1261
1262 #[async_trait::async_trait]
1263 impl NetworkPeers for TestNetwork {
1264 fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1265 unimplemented!()
1266 }
1267
1268 fn set_authorized_only(&self, _: bool) {
1269 unimplemented!()
1270 }
1271
1272 fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1273 unimplemented!()
1274 }
1275
1276 fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1277 self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1278 }
1279
1280 fn peer_reputation(&self, _: &PeerId) -> i32 {
1281 unimplemented!()
1282 }
1283
1284 fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1285 self.disconnected_peers.lock().unwrap().push(peer);
1286 }
1287
1288 fn accept_unreserved_peers(&self) {
1289 unimplemented!()
1290 }
1291
1292 fn deny_unreserved_peers(&self) {
1293 unimplemented!()
1294 }
1295
1296 fn add_reserved_peer(
1297 &self,
1298 _: sc_network::config::MultiaddrWithPeerId,
1299 ) -> Result<(), String> {
1300 unimplemented!()
1301 }
1302
1303 fn remove_reserved_peer(&self, _: PeerId) {
1304 unimplemented!()
1305 }
1306
1307 fn set_reserved_peers(
1308 &self,
1309 _: sc_network::ProtocolName,
1310 _: std::collections::HashSet<sc_network::Multiaddr>,
1311 ) -> Result<(), String> {
1312 unimplemented!()
1313 }
1314
1315 fn add_peers_to_reserved_set(
1316 &self,
1317 _: sc_network::ProtocolName,
1318 addrs: std::collections::HashSet<sc_network::Multiaddr>,
1319 ) -> Result<(), String> {
1320 self.added_reserved.lock().unwrap().push(addrs);
1321 Ok(())
1322 }
1323
1324 fn remove_peers_from_reserved_set(
1325 &self,
1326 _: sc_network::ProtocolName,
1327 peers: Vec<PeerId>,
1328 ) -> Result<(), String> {
1329 self.removed_reserved.lock().unwrap().push(peers);
1330 Ok(())
1331 }
1332
1333 fn sync_num_connected(&self) -> usize {
1334 unimplemented!()
1335 }
1336
1337 fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1338 unimplemented!()
1339 }
1340
1341 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1342 unimplemented!();
1343 }
1344 }
1345
1346 #[derive(Clone)]
1347 struct TestSync {
1348 major_syncing: Arc<AtomicBool>,
1349 }
1350
1351 impl TestSync {
1352 fn new() -> Self {
1353 Self { major_syncing: Arc::new(AtomicBool::new(false)) }
1354 }
1355
1356 fn with_syncing(initial: bool) -> (Self, Arc<AtomicBool>) {
1357 let flag = Arc::new(AtomicBool::new(initial));
1358 (Self { major_syncing: flag.clone() }, flag)
1359 }
1360 }
1361
1362 impl SyncEventStream for TestSync {
1363 fn event_stream(
1364 &self,
1365 _name: &'static str,
1366 ) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1367 Box::pin(futures::stream::pending())
1368 }
1369 }
1370
1371 impl sp_consensus::SyncOracle for TestSync {
1372 fn is_major_syncing(&self) -> bool {
1373 self.major_syncing.load(Ordering::Relaxed)
1374 }
1375
1376 fn is_offline(&self) -> bool {
1377 unimplemented!()
1378 }
1379 }
1380
1381 impl NetworkEventStream for TestNetwork {
1382 fn event_stream(
1383 &self,
1384 _name: &'static str,
1385 ) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1386 unimplemented!()
1387 }
1388 }
1389
1390 #[derive(Debug, Clone)]
1391 struct TestNotificationService {
1392 sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1393 }
1394
1395 impl TestNotificationService {
1396 fn new() -> Self {
1397 Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1398 }
1399
1400 fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1401 self.sent_notifications.lock().unwrap().clone()
1402 }
1403 }
1404
1405 #[async_trait::async_trait]
1406 impl NotificationService for TestNotificationService {
1407 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1408 unimplemented!()
1409 }
1410
1411 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1412 unimplemented!()
1413 }
1414
1415 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1416 self.sent_notifications.lock().unwrap().push((*peer, notification));
1417 }
1418
1419 async fn send_async_notification(
1420 &mut self,
1421 peer: &PeerId,
1422 notification: Vec<u8>,
1423 ) -> Result<(), sc_network::error::Error> {
1424 self.sent_notifications.lock().unwrap().push((*peer, notification));
1425 Ok(())
1426 }
1427
1428 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1429 unimplemented!()
1430 }
1431
1432 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1433 unimplemented!()
1434 }
1435
1436 async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1437 None
1438 }
1439
1440 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1441 unimplemented!()
1442 }
1443
1444 fn protocol(&self) -> &sc_network::types::ProtocolName {
1445 unimplemented!()
1446 }
1447
1448 fn message_sink(
1449 &self,
1450 _peer: &PeerId,
1451 ) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1452 unimplemented!()
1453 }
1454 }
1455
1456 #[derive(Clone)]
1457 struct TestStatementStore {
1458 statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1459 recent_statements:
1460 Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1461 }
1462
1463 impl TestStatementStore {
1464 fn new() -> Self {
1465 Self { statements: Default::default(), recent_statements: Default::default() }
1466 }
1467 }
1468
1469 impl StatementStore for TestStatementStore {
1470 fn statements(
1471 &self,
1472 ) -> sp_statement_store::Result<
1473 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1474 > {
1475 Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1476 }
1477
1478 fn take_recent_statements(
1479 &self,
1480 ) -> sp_statement_store::Result<
1481 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1482 > {
1483 Ok(self.recent_statements.lock().unwrap().drain().collect())
1484 }
1485
1486 fn statement(
1487 &self,
1488 _hash: &sp_statement_store::Hash,
1489 ) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1490 unimplemented!()
1491 }
1492
1493 fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1494 self.statements.lock().unwrap().contains_key(hash)
1495 }
1496
1497 fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1498 self.statements.lock().unwrap().keys().cloned().collect()
1499 }
1500
1501 fn statements_by_hashes(
1502 &self,
1503 hashes: &[sp_statement_store::Hash],
1504 filter: &mut dyn FnMut(
1505 &sp_statement_store::Hash,
1506 &[u8],
1507 &sp_statement_store::Statement,
1508 ) -> FilterDecision,
1509 ) -> sp_statement_store::Result<(
1510 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1511 usize,
1512 )> {
1513 let statements = self.statements.lock().unwrap();
1514 let mut result = Vec::new();
1515 let mut processed = 0;
1516 for hash in hashes {
1517 let Some(stmt) = statements.get(hash) else {
1518 processed += 1;
1519 continue;
1520 };
1521 let encoded = stmt.encode();
1522 match filter(hash, &encoded, stmt) {
1523 FilterDecision::Skip => {
1524 processed += 1;
1525 },
1526 FilterDecision::Take => {
1527 processed += 1;
1528 result.push((*hash, stmt.clone()));
1529 },
1530 FilterDecision::Abort => break,
1531 }
1532 }
1533 Ok((result, processed))
1534 }
1535
1536 fn broadcasts(
1537 &self,
1538 _match_all_topics: &[sp_statement_store::Topic],
1539 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1540 unimplemented!()
1541 }
1542
1543 fn posted(
1544 &self,
1545 _match_all_topics: &[sp_statement_store::Topic],
1546 _dest: [u8; 32],
1547 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1548 unimplemented!()
1549 }
1550
1551 fn posted_clear(
1552 &self,
1553 _match_all_topics: &[sp_statement_store::Topic],
1554 _dest: [u8; 32],
1555 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1556 unimplemented!()
1557 }
1558
1559 fn broadcasts_stmt(
1560 &self,
1561 _match_all_topics: &[sp_statement_store::Topic],
1562 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1563 unimplemented!()
1564 }
1565
1566 fn posted_stmt(
1567 &self,
1568 _match_all_topics: &[sp_statement_store::Topic],
1569 _dest: [u8; 32],
1570 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1571 unimplemented!()
1572 }
1573
1574 fn posted_clear_stmt(
1575 &self,
1576 _match_all_topics: &[sp_statement_store::Topic],
1577 _dest: [u8; 32],
1578 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1579 unimplemented!()
1580 }
1581
1582 fn submit(
1583 &self,
1584 _statement: sp_statement_store::Statement,
1585 _source: sp_statement_store::StatementSource,
1586 ) -> sp_statement_store::SubmitResult {
1587 unimplemented!()
1588 }
1589
1590 fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1591 unimplemented!()
1592 }
1593
1594 fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1595 unimplemented!()
1596 }
1597 }
1598
1599 fn build_handler() -> (
1600 StatementHandler<TestNetwork, TestSync>,
1601 TestStatementStore,
1602 TestNetwork,
1603 TestNotificationService,
1604 async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1605 ) {
1606 let statement_store = TestStatementStore::new();
1607 let (queue_sender, queue_receiver) = async_channel::bounded(2);
1608 let network = TestNetwork::new();
1609 let notification_service = TestNotificationService::new();
1610 let peer_id = PeerId::random();
1611 let mut peers = HashMap::new();
1612 peers.insert(
1613 peer_id,
1614 Peer {
1615 known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1616 rate_limiter: PeerRateLimiter::new(
1617 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1618 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1619 NonZeroU32::new(
1620 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1621 )
1622 .expect("burst capacity is nonzero"),
1623 ),
1624 },
1625 );
1626
1627 let handler = StatementHandler {
1628 protocol_name: "/statement/1".into(),
1629 notification_service: Box::new(notification_service.clone()),
1630 propagate_timeout: (Box::pin(futures::stream::pending())
1631 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1632 .fuse(),
1633 pending_statements: FuturesUnordered::new(),
1634 pending_statements_peers: HashMap::new(),
1635 network: network.clone(),
1636 sync: TestSync::new(),
1637 sync_event_stream: (Box::pin(futures::stream::pending())
1638 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1639 .fuse(),
1640 peers,
1641 statement_store: Arc::new(statement_store.clone()),
1642 queue_sender,
1643 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1644 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1645 metrics: None,
1646 initial_sync_timeout: Box::pin(futures::future::pending()),
1647 pending_initial_syncs: HashMap::new(),
1648 initial_sync_peer_queue: VecDeque::new(),
1649 deferred_peers: HashSet::new(),
1650 dropped_statements_during_sync: false,
1651 sync_recovery_peer: None,
1652 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1653 };
1654 (handler, statement_store, network, notification_service, queue_receiver)
1655 }
1656
1657 #[tokio::test]
1658 async fn test_skips_processing_statements_that_already_in_store() {
1659 let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1660 build_handler();
1661
1662 let mut statement1 = Statement::new();
1663 statement1.set_plain_data(b"statement1".to_vec());
1664 let hash1 = statement1.hash();
1665
1666 statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1667
1668 let mut statement2 = Statement::new();
1669 statement2.set_plain_data(b"statement2".to_vec());
1670 let hash2 = statement2.hash();
1671
1672 let peer_id = *handler.peers.keys().next().unwrap();
1673
1674 handler.on_statements(peer_id, vec![statement1, statement2]);
1675
1676 let to_submit = queue_receiver.try_recv();
1677 assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1678
1679 let no_more = queue_receiver.try_recv();
1680 assert!(no_more.is_err(), "Expected only one statement to be queued");
1681 }
1682
1683 #[tokio::test]
1684 async fn test_reports_for_duplicate_statements() {
1685 let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1686 build_handler();
1687
1688 let peer_id = *handler.peers.keys().next().unwrap();
1689
1690 let mut statement1 = Statement::new();
1691 statement1.set_plain_data(b"statement1".to_vec());
1692
1693 handler.on_statements(peer_id, vec![statement1.clone()]);
1694 {
1695 let (s, _) = queue_receiver.try_recv().unwrap();
1697 let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1698 handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1699 }
1700
1701 handler.on_statements(peer_id, vec![statement1]);
1702
1703 let reports = network.get_reports();
1704 assert_eq!(
1705 reports,
1706 vec![
1707 (peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
1711 "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1712 reports
1713 );
1714 }
1715
1716 #[tokio::test]
1717 async fn test_splits_large_batches_into_smaller_chunks() {
1718 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1719 build_handler();
1720
1721 let num_statements = 30;
1722 let statement_size = 100 * 1024; for i in 0..num_statements {
1724 let mut statement = Statement::new();
1725 let mut data = vec![0u8; statement_size];
1726 data[0] = i as u8;
1727 statement.set_plain_data(data);
1728 let hash = statement.hash();
1729 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1730 }
1731
1732 handler.propagate_statements().await;
1733
1734 let sent = notification_service.get_sent_notifications();
1735 let mut total_statements_sent = 0;
1736 assert!(
1737 sent.len() == 3,
1738 "Expected batch to be split into 3 chunks, but got {} chunks",
1739 sent.len()
1740 );
1741 for (_peer, notification) in sent.iter() {
1742 assert!(
1743 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1744 "Notification size {} exceeds limit {}",
1745 notification.len(),
1746 MAX_STATEMENT_NOTIFICATION_SIZE
1747 );
1748 if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1749 total_statements_sent += stmts.len();
1750 }
1751 }
1752
1753 assert_eq!(
1754 total_statements_sent, num_statements,
1755 "Expected all {} statements to be sent, but only {} were sent",
1756 num_statements, total_statements_sent
1757 );
1758 }
1759
1760 #[tokio::test]
1761 async fn test_skips_only_oversized_statements() {
1762 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1763 build_handler();
1764
1765 let mut statement1 = Statement::new();
1766 statement1.set_plain_data(vec![1u8; 100]);
1767 let hash1 = statement1.hash();
1768 statement_store
1769 .recent_statements
1770 .lock()
1771 .unwrap()
1772 .insert(hash1, statement1.clone());
1773
1774 let mut oversized1 = Statement::new();
1775 oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1776 let hash_oversized1 = oversized1.hash();
1777 statement_store
1778 .recent_statements
1779 .lock()
1780 .unwrap()
1781 .insert(hash_oversized1, oversized1);
1782
1783 let mut statement2 = Statement::new();
1784 statement2.set_plain_data(vec![3u8; 100]);
1785 let hash2 = statement2.hash();
1786 statement_store
1787 .recent_statements
1788 .lock()
1789 .unwrap()
1790 .insert(hash2, statement2.clone());
1791
1792 let mut oversized2 = Statement::new();
1793 oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1794 let hash_oversized2 = oversized2.hash();
1795 statement_store
1796 .recent_statements
1797 .lock()
1798 .unwrap()
1799 .insert(hash_oversized2, oversized2);
1800
1801 let mut statement3 = Statement::new();
1802 statement3.set_plain_data(vec![5u8; 100]);
1803 let hash3 = statement3.hash();
1804 statement_store
1805 .recent_statements
1806 .lock()
1807 .unwrap()
1808 .insert(hash3, statement3.clone());
1809
1810 handler.propagate_statements().await;
1811
1812 let sent = notification_service.get_sent_notifications();
1813
1814 let mut sent_hashes = sent
1815 .iter()
1816 .flat_map(|(_peer, notification)| {
1817 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1818 })
1819 .map(|s| s.hash())
1820 .collect::<Vec<_>>();
1821 sent_hashes.sort();
1822 let mut expected_hashes = vec![hash1, hash2, hash3];
1823 expected_hashes.sort();
1824 assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1825 }
1826
1827 fn build_handler_no_peers() -> (
1828 StatementHandler<TestNetwork, TestSync>,
1829 TestStatementStore,
1830 TestNetwork,
1831 TestNotificationService,
1832 ) {
1833 let statement_store = TestStatementStore::new();
1834 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1835 let network = TestNetwork::new();
1836 let notification_service = TestNotificationService::new();
1837
1838 let handler = StatementHandler {
1839 protocol_name: "/statement/1".into(),
1840 notification_service: Box::new(notification_service.clone()),
1841 propagate_timeout: (Box::pin(futures::stream::pending())
1842 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1843 .fuse(),
1844 pending_statements: FuturesUnordered::new(),
1845 pending_statements_peers: HashMap::new(),
1846 network: network.clone(),
1847 sync: TestSync::new(),
1848 sync_event_stream: (Box::pin(futures::stream::pending())
1849 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1850 .fuse(),
1851 peers: HashMap::new(),
1852 statement_store: Arc::new(statement_store.clone()),
1853 queue_sender,
1854 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1855 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1856 metrics: None,
1857 initial_sync_timeout: Box::pin(futures::future::pending()),
1858 pending_initial_syncs: HashMap::new(),
1859 initial_sync_peer_queue: VecDeque::new(),
1860 deferred_peers: HashSet::new(),
1861 dropped_statements_during_sync: false,
1862 sync_recovery_peer: None,
1863 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1864 };
1865 (handler, statement_store, network, notification_service)
1866 }
1867
1868 #[tokio::test]
1869 async fn test_initial_sync_burst_single_peer() {
1870 let (mut handler, statement_store, _network, notification_service) =
1871 build_handler_no_peers();
1872
1873 let num_statements = 200;
1876 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1878 for i in 0..num_statements {
1879 let mut statement = Statement::new();
1880 let mut data = vec![0u8; statement_size];
1881 data[0] = (i % 256) as u8;
1883 data[1] = (i / 256) as u8;
1884 statement.set_plain_data(data);
1885 let hash = statement.hash();
1886 expected_hashes.push(hash);
1887 statement_store.statements.lock().unwrap().insert(hash, statement);
1888 }
1889
1890 let peer_id = PeerId::random();
1892
1893 handler
1894 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1895 peer: peer_id,
1896 direction: sc_network::service::traits::Direction::Inbound,
1897 handshake: vec![],
1898 negotiated_fallback: None,
1899 })
1900 .await;
1901
1902 assert!(handler.peers.contains_key(&peer_id));
1904 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1905 assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1906
1907 let mut burst_count = 0;
1909 while handler.pending_initial_syncs.contains_key(&peer_id) {
1910 handler.process_initial_sync_burst().await;
1911 burst_count += 1;
1912 assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1914 }
1915
1916 assert!(
1919 burst_count >= 10,
1920 "Expected multiple bursts for 200 statements of 100KB each, got {}",
1921 burst_count
1922 );
1923
1924 let sent = notification_service.get_sent_notifications();
1926 let mut sent_hashes: Vec<_> = sent
1927 .iter()
1928 .flat_map(|(peer, notification)| {
1929 assert_eq!(*peer, peer_id);
1930 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1931 })
1932 .map(|s| s.hash())
1933 .collect();
1934 sent_hashes.sort();
1935 expected_hashes.sort();
1936
1937 assert_eq!(
1938 sent_hashes.len(),
1939 expected_hashes.len(),
1940 "Expected {} statements to be sent, got {}",
1941 expected_hashes.len(),
1942 sent_hashes.len()
1943 );
1944 assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1945
1946 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1948 assert!(handler.initial_sync_peer_queue.is_empty());
1949 }
1950
1951 #[tokio::test]
1952 async fn test_initial_sync_burst_multiple_peers_round_robin() {
1953 let (mut handler, statement_store, _network, notification_service) =
1954 build_handler_no_peers();
1955
1956 let num_statements = 200;
1958 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1960 for i in 0..num_statements {
1961 let mut statement = Statement::new();
1962 let mut data = vec![0u8; statement_size];
1963 data[0] = (i % 256) as u8;
1964 data[1] = (i / 256) as u8;
1965 statement.set_plain_data(data);
1966 let hash = statement.hash();
1967 expected_hashes.push(hash);
1968 statement_store.statements.lock().unwrap().insert(hash, statement);
1969 }
1970
1971 let peer1 = PeerId::random();
1973 let peer2 = PeerId::random();
1974 let peer3 = PeerId::random();
1975
1976 for peer in [peer1, peer2, peer3] {
1978 handler
1979 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1980 peer,
1981 direction: sc_network::service::traits::Direction::Inbound,
1982 handshake: vec![],
1983 negotiated_fallback: None,
1984 })
1985 .await;
1986 }
1987
1988 assert_eq!(handler.peers.len(), 3);
1990 assert_eq!(handler.pending_initial_syncs.len(), 3);
1991 assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1992
1993 let mut peer_burst_order = Vec::new();
1995 let mut burst_count = 0;
1996
1997 while !handler.pending_initial_syncs.is_empty() {
1998 if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
2000 peer_burst_order.push(next_peer);
2001 }
2002 handler.process_initial_sync_burst().await;
2003 burst_count += 1;
2004 assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
2006 }
2007
2008 assert!(
2011 burst_count >= 30,
2012 "Expected many bursts for 3 peers with 200 statements each, got {}",
2013 burst_count
2014 );
2015
2016 assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
2018 assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
2020 assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
2021 assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
2022 assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
2024 assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
2025 assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
2026
2027 let sent = notification_service.get_sent_notifications();
2029 let mut peer1_hashes: Vec<_> = sent
2030 .iter()
2031 .filter(|(peer, _)| *peer == peer1)
2032 .flat_map(|(_, notification)| {
2033 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2034 })
2035 .map(|s| s.hash())
2036 .collect();
2037 let mut peer2_hashes: Vec<_> = sent
2038 .iter()
2039 .filter(|(peer, _)| *peer == peer2)
2040 .flat_map(|(_, notification)| {
2041 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2042 })
2043 .map(|s| s.hash())
2044 .collect();
2045 let mut peer3_hashes: Vec<_> = sent
2046 .iter()
2047 .filter(|(peer, _)| *peer == peer3)
2048 .flat_map(|(_, notification)| {
2049 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2050 })
2051 .map(|s| s.hash())
2052 .collect();
2053
2054 peer1_hashes.sort();
2055 peer2_hashes.sort();
2056 peer3_hashes.sort();
2057 expected_hashes.sort();
2058
2059 assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
2060 assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
2061 assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
2062
2063 assert!(handler.pending_initial_syncs.is_empty());
2065 assert!(handler.initial_sync_peer_queue.is_empty());
2066 }
2067
2068 #[tokio::test]
2069 async fn test_send_statements_in_chunks_exact_max_size() {
2070 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
2071 build_handler();
2072
2073 let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
2087 let num_statements: usize = 100;
2088 let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
2090 let total_data_size = max_size - total_overhead;
2091 let per_statement_data_size = total_data_size / num_statements;
2092 let remainder = total_data_size % num_statements;
2093
2094 let mut expected_hashes = Vec::with_capacity(num_statements);
2095 let mut total_encoded_size = 0;
2096
2097 for i in 0..num_statements {
2098 let mut statement = Statement::new();
2099 let extra = if i < remainder { 1 } else { 0 };
2101 let mut data = vec![42u8; per_statement_data_size + extra];
2102 data[0] = i as u8;
2104 data[1] = (i >> 8) as u8;
2105 statement.set_plain_data(data);
2106
2107 total_encoded_size += statement.encoded_size();
2108
2109 let hash = statement.hash();
2110 expected_hashes.push(hash);
2111 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2112 }
2113
2114 assert!(
2116 total_encoded_size == max_size,
2117 "Total encoded size {} should be <= max_size {}",
2118 total_encoded_size,
2119 max_size
2120 );
2121
2122 handler.propagate_statements().await;
2123
2124 let sent = notification_service.get_sent_notifications();
2125
2126 assert_eq!(
2128 sent.len(),
2129 1,
2130 "Expected 1 notification for all {} statements, but got {}",
2131 num_statements,
2132 sent.len()
2133 );
2134
2135 let (_peer, notification) = &sent[0];
2136 assert!(
2137 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2138 "Notification size {} exceeds limit {}",
2139 notification.len(),
2140 MAX_STATEMENT_NOTIFICATION_SIZE
2141 );
2142
2143 let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
2144 assert_eq!(
2145 decoded.len(),
2146 num_statements,
2147 "Expected {} statements in the notification",
2148 num_statements
2149 );
2150
2151 let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
2153 expected_hashes.sort();
2154 received_hashes.sort();
2155 assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
2156 }
2157
2158 #[tokio::test]
2159 async fn test_initial_sync_burst_size_limit_consistency() {
2160 let (mut handler, statement_store, _network, notification_service) =
2171 build_handler_no_peers();
2172
2173 let payload_limit = max_statement_payload_size();
2174
2175 let first_stmt_data_size = payload_limit / 2 + 10;
2177 let mut stmt1 = Statement::new();
2178 stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2179 let stmt1_encoded_size = stmt1.encoded_size();
2180
2181 let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2184 let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
2187 stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2188 let stmt2_encoded_size = stmt2.encoded_size();
2189
2190 let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2191
2192 assert!(
2194 total_encoded > payload_limit,
2195 "Total {} should exceed payload_limit {} so filter rejects second statement",
2196 total_encoded,
2197 payload_limit
2198 );
2199
2200 let hash1 = stmt1.hash();
2201 let hash2 = stmt2.hash();
2202 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2203 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2204
2205 let peer_id = PeerId::random();
2207
2208 handler
2209 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2210 peer: peer_id,
2211 direction: sc_network::service::traits::Direction::Inbound,
2212 handshake: vec![],
2213 negotiated_fallback: None,
2214 })
2215 .await;
2216
2217 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2219 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2220
2221 handler.process_initial_sync_burst().await;
2223
2224 let sent = notification_service.get_sent_notifications();
2227 assert_eq!(sent.len(), 1, "First burst should send one notification");
2228
2229 let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2230 assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2231
2232 let sent_hash = decoded[0].hash();
2234 assert!(
2235 sent_hash == hash1 || sent_hash == hash2,
2236 "Sent statement should be one of the two created"
2237 );
2238
2239 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2241 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2242
2243 handler.process_initial_sync_burst().await;
2245
2246 let sent = notification_service.get_sent_notifications();
2247 assert_eq!(sent.len(), 2, "Second burst should send another notification");
2248
2249 let mut sent_hashes: Vec<_> = sent
2251 .iter()
2252 .flat_map(|(_, notification)| {
2253 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2254 })
2255 .map(|s| s.hash())
2256 .collect();
2257 sent_hashes.sort();
2258 let mut expected_hashes = vec![hash1, hash2];
2259 expected_hashes.sort();
2260 assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2261
2262 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2264 }
2265
2266 #[tokio::test]
2267 async fn test_peer_disconnected_on_flooding() {
2268 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2269 build_handler();
2270
2271 let peer_id = *handler.peers.keys().next().unwrap();
2272
2273 let mut flood_statements = Vec::new();
2274 for i in 0..600_000 {
2275 let mut statement = Statement::new();
2276 statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2277 flood_statements.push(statement);
2278 }
2279
2280 handler.on_statements(peer_id, flood_statements);
2281
2282 let reports = network.get_reports();
2283 assert!(
2284 reports
2285 .iter()
2286 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2287 "Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2288 reports
2289 );
2290
2291 let disconnected = network.get_disconnected_peers();
2292 assert!(
2293 disconnected.contains(&peer_id),
2294 "Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2295 peer_id,
2296 disconnected
2297 );
2298
2299 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2301 assert!(
2302 !handler.pending_initial_syncs.contains_key(&peer_id),
2303 "Peer should be removed from pending_initial_syncs"
2304 );
2305 assert!(
2306 !handler.initial_sync_peer_queue.contains(&peer_id),
2307 "Peer should be removed from initial_sync_peer_queue"
2308 );
2309 }
2310
2311 #[tokio::test]
2312 async fn test_legitimate_traffic_not_flagged() {
2313 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2314 build_handler();
2315
2316 let peer_id = *handler.peers.keys().next().unwrap();
2317
2318 let start = std::time::Instant::now();
2319 let duration = std::time::Duration::from_secs(5);
2320 let mut counter = 0u32;
2321
2322 while start.elapsed() < duration {
2323 let mut statements = Vec::new();
2324 for i in 0..5_000 {
2325 let mut statement = Statement::new();
2326 statement.set_plain_data(vec![
2327 counter as u8,
2328 (counter >> 8) as u8,
2329 (counter >> 16) as u8,
2330 i as u8,
2331 ]);
2332 statements.push(statement);
2333 counter = counter.wrapping_add(1);
2334 }
2335
2336 handler.on_statements(peer_id, statements);
2337
2338 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2339 }
2340
2341 let reports = network.get_reports();
2342 assert!(
2343 !reports
2344 .iter()
2345 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2346 "Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2347 reports
2348 );
2349
2350 let disconnected = network.get_disconnected_peers();
2351 assert!(
2352 !disconnected.contains(&peer_id),
2353 "Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2354 disconnected
2355 );
2356
2357 assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2358 }
2359
2360 #[tokio::test]
2361 async fn test_just_over_rate_limit_triggers_flooding() {
2362 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2363 build_handler();
2364
2365 let peer_id = *handler.peers.keys().next().unwrap();
2366
2367 let mut statements = Vec::new();
2368 for i in 0..260_000 {
2369 let mut statement = Statement::new();
2370 statement.set_plain_data(vec![
2371 i as u8,
2372 (i >> 8) as u8,
2373 (i >> 16) as u8,
2374 (i >> 24) as u8,
2375 ]);
2376 statements.push(statement);
2377 }
2378
2379 handler.on_statements(peer_id, statements);
2380
2381 let reports = network.get_reports();
2382 let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2383 assert!(
2384 reports
2385 .iter()
2386 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2387 "Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2388 expected_burst,
2389 reports
2390 );
2391
2392 let disconnected = network.get_disconnected_peers();
2393 assert!(
2394 disconnected.contains(&peer_id),
2395 "Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2396 disconnected
2397 );
2398
2399 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2400 }
2401
2402 #[tokio::test]
2403 async fn test_burst_of_250k_statements_allowed() {
2404 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2405 build_handler();
2406
2407 let peer_id = *handler.peers.keys().next().unwrap();
2408
2409 let mut statements = Vec::new();
2410 for i in 0..250_000 {
2411 let mut statement = Statement::new();
2412 statement.set_plain_data(vec![
2413 i as u8,
2414 (i >> 8) as u8,
2415 (i >> 16) as u8,
2416 (i >> 24) as u8,
2417 ]);
2418 statements.push(statement);
2419 }
2420
2421 handler.on_statements(peer_id, statements);
2422
2423 let reports = network.get_reports();
2424 assert!(
2425 !reports
2426 .iter()
2427 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2428 "250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2429 reports
2430 );
2431
2432 assert!(
2433 handler.peers.contains_key(&peer_id),
2434 "Peer should still be connected after 250k burst"
2435 );
2436 }
2437
2438 #[tokio::test]
2439 async fn test_sustained_rate_above_limit_triggers_flooding() {
2440 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2441 build_handler();
2442
2443 let peer_id = *handler.peers.keys().next().unwrap();
2444
2445 let mut counter = 0u32;
2446
2447 let start = std::time::Instant::now();
2448 let duration = std::time::Duration::from_secs(5);
2449
2450 let mut flooding_detected = false;
2451 while start.elapsed() < duration {
2452 let mut statements = Vec::new();
2453 for i in 0..30_000 {
2454 let mut statement = Statement::new();
2455 statement.set_plain_data(vec![
2456 counter as u8,
2457 (counter >> 8) as u8,
2458 (counter >> 16) as u8,
2459 i as u8,
2460 ]);
2461 statements.push(statement);
2462 counter = counter.wrapping_add(1);
2463 }
2464
2465 handler.on_statements(peer_id, statements);
2466
2467 let reports = network.get_reports();
2469 if reports
2470 .iter()
2471 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2472 {
2473 flooding_detected = true;
2474 break;
2475 }
2476
2477 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2478 }
2479
2480 assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2481
2482 let disconnected = network.get_disconnected_peers();
2483 assert!(
2484 disconnected.contains(&peer_id),
2485 "Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2486 disconnected
2487 );
2488
2489 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2490 }
2491
2492 #[test]
2495 fn major_sync_defers_peers_and_handles_disconnect() {
2496 let (sync, _flag) = TestSync::with_syncing(true);
2497 let network = TestNetwork::new();
2498 let notification_service = TestNotificationService::new();
2499 let statement_store = TestStatementStore::new();
2500 let (queue_sender, _queue_receiver) = async_channel::bounded(100);
2501
2502 let mut handler = StatementHandler {
2503 protocol_name: "/statement/1".into(),
2504 notification_service: Box::new(notification_service),
2505 propagate_timeout: (Box::pin(futures::stream::pending())
2506 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2507 .fuse(),
2508 pending_statements: FuturesUnordered::new(),
2509 pending_statements_peers: HashMap::new(),
2510 network: network.clone(),
2511 sync,
2512 sync_event_stream: (Box::pin(futures::stream::pending())
2513 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2514 .fuse(),
2515 peers: HashMap::new(),
2516 statement_store: Arc::new(statement_store),
2517 queue_sender,
2518 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2519 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2520 metrics: None,
2521 initial_sync_timeout: Box::pin(futures::future::pending()),
2522 pending_initial_syncs: HashMap::new(),
2523 initial_sync_peer_queue: VecDeque::new(),
2524 deferred_peers: HashSet::new(),
2525 dropped_statements_during_sync: false,
2526 sync_recovery_peer: None,
2527 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2528 };
2529
2530 let peer1 = PeerId::random();
2531 let peer2 = PeerId::random();
2532 let peer3 = PeerId::random();
2533
2534 handler.handle_sync_event(SyncEvent::PeerConnected(peer1));
2535 handler.handle_sync_event(SyncEvent::PeerConnected(peer2));
2536 handler.handle_sync_event(SyncEvent::PeerConnected(peer3));
2537
2538 assert!(network.get_added_reserved().is_empty());
2540 assert!(network.get_removed_reserved().is_empty());
2541 assert_eq!(handler.deferred_peers.len(), 3);
2542
2543 handler.handle_sync_event(SyncEvent::PeerDisconnected(peer1));
2545 assert_eq!(handler.deferred_peers.len(), 2);
2546 assert!(!handler.deferred_peers.contains(&peer1), "disconnected peer must leave buffer");
2547 assert!(handler.deferred_peers.contains(&peer2));
2548 assert!(handler.deferred_peers.contains(&peer3));
2549 assert!(network.get_removed_reserved().is_empty(), "no remove call for buffered peer");
2550 }
2551
2552 #[test]
2553 fn deferred_peers_flushed_on_sync_end_without_remove() {
2554 let (sync, flag) = TestSync::with_syncing(true);
2555 let network = TestNetwork::new();
2556 let notification_service = TestNotificationService::new();
2557 let statement_store = TestStatementStore::new();
2558 let (queue_sender, _queue_receiver) = async_channel::bounded(100);
2559
2560 let peer1 = PeerId::random();
2561 let peer2 = PeerId::random();
2562 let mut deferred = HashSet::new();
2563 deferred.insert(peer1);
2564 deferred.insert(peer2);
2565
2566 let mut handler = StatementHandler {
2567 protocol_name: "/statement/1".into(),
2568 notification_service: Box::new(notification_service),
2569 propagate_timeout: (Box::pin(futures::stream::pending())
2570 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2571 .fuse(),
2572 pending_statements: FuturesUnordered::new(),
2573 pending_statements_peers: HashMap::new(),
2574 network: network.clone(),
2575 sync,
2576 sync_event_stream: (Box::pin(futures::stream::pending())
2577 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2578 .fuse(),
2579 peers: HashMap::new(),
2580 statement_store: Arc::new(statement_store),
2581 queue_sender,
2582 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2583 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2584 metrics: None,
2585 initial_sync_timeout: Box::pin(futures::future::pending()),
2586 pending_initial_syncs: HashMap::new(),
2587 initial_sync_peer_queue: VecDeque::new(),
2588 deferred_peers: deferred,
2589 dropped_statements_during_sync: false,
2590 sync_recovery_peer: None,
2591 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2592 };
2593
2594 flag.store(false, Ordering::Relaxed);
2595 handler.drain_deferred_peers();
2596
2597 assert!(handler.deferred_peers.is_empty());
2598
2599 let added = network.get_added_reserved();
2600 assert_eq!(added.len(), 1);
2601 let added_addrs = &added[0];
2602 let expected_addr1: sc_network::Multiaddr =
2603 iter::once(multiaddr::Protocol::P2p(peer1.into())).collect();
2604 let expected_addr2: sc_network::Multiaddr =
2605 iter::once(multiaddr::Protocol::P2p(peer2.into())).collect();
2606 assert!(added_addrs.contains(&expected_addr1), "peer1 must be in added set");
2607 assert!(added_addrs.contains(&expected_addr2), "peer2 must be in added set");
2608
2609 assert!(network.get_removed_reserved().is_empty());
2610 }
2611
2612 #[tokio::test]
2613 async fn sync_recovery_schedules_remove_for_one_connected_peer() {
2614 let network = TestNetwork::new();
2615 let notification_service = TestNotificationService::new();
2616 let (sync, _flag) = TestSync::with_syncing(false);
2617 let (queue_sender, _) = async_channel::bounded(2);
2618 let statement_store = TestStatementStore::new();
2619
2620 let connected_peer = PeerId::random();
2621
2622 let mut peers = HashMap::new();
2623 peers.insert(
2624 connected_peer,
2625 Peer {
2626 known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
2627 rate_limiter: PeerRateLimiter::new(
2628 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2629 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2630 NonZeroU32::new(
2631 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
2632 )
2633 .expect("burst capacity is nonzero"),
2634 ),
2635 },
2636 );
2637
2638 let mut handler = StatementHandler {
2639 protocol_name: "/statement/1".into(),
2640 notification_service: Box::new(notification_service),
2641 propagate_timeout: (Box::pin(futures::stream::pending())
2642 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2643 .fuse(),
2644 pending_statements: FuturesUnordered::new(),
2645 pending_statements_peers: HashMap::new(),
2646 network: network.clone(),
2647 sync,
2648 sync_event_stream: (Box::pin(futures::stream::pending())
2649 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2650 .fuse(),
2651 peers,
2652 statement_store: Arc::new(statement_store),
2653 queue_sender,
2654 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2655 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2656 metrics: None,
2657 initial_sync_timeout: Box::pin(futures::future::pending()),
2658 pending_initial_syncs: HashMap::new(),
2659 initial_sync_peer_queue: VecDeque::new(),
2660 deferred_peers: HashSet::new(),
2661 dropped_statements_during_sync: true,
2662 sync_recovery_peer: None,
2663 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2664 };
2665
2666 handler.start_sync_recovery();
2667
2668 {
2670 let removed = network.removed_reserved.lock().unwrap();
2671 assert_eq!(
2672 removed.len(),
2673 1,
2674 "Expected exactly one remove_peers_from_reserved_set call"
2675 );
2676 assert!(removed[0].contains(&connected_peer));
2677 }
2678
2679 assert_eq!(handler.sync_recovery_peer, Some(connected_peer));
2681
2682 handler.try_readd_sync_recovery_peer();
2685 assert!(handler.sync_recovery_peer.is_none());
2686 {
2687 let added = network.added_reserved.lock().unwrap();
2688 assert_eq!(added.len(), 1);
2689 let expected_addr: multiaddr::Multiaddr =
2690 iter::once(multiaddr::Protocol::P2p(connected_peer.into())).collect();
2691 assert!(added[0].contains(&expected_addr));
2692 }
2693
2694 {
2697 let peer2 = PeerId::random();
2698 handler.sync_recovery_peer = Some(peer2);
2699 handler.start_sync_recovery();
2700 assert_eq!(
2701 handler.sync_recovery_peer,
2702 Some(peer2),
2703 "Re-entry guard: recovery peer must not change on second call"
2704 );
2705 assert_eq!(
2706 network.removed_reserved.lock().unwrap().len(),
2707 1,
2708 "Re-entry guard: no extra remove call while recovery is in flight"
2709 );
2710 }
2711 }
2712
2713 #[tokio::test]
2714 async fn sync_recovery_gated_by_dropped_statements_flag() {
2715 let make_peer = || Peer {
2716 known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
2717 rate_limiter: PeerRateLimiter::new(
2718 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2719 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2720 NonZeroU32::new(
2721 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
2722 )
2723 .expect("burst capacity is nonzero"),
2724 ),
2725 };
2726
2727 let make_handler =
2728 |network: TestNetwork, dropped: bool| -> StatementHandler<TestNetwork, TestSync> {
2729 let (sync, _) = TestSync::with_syncing(false);
2730 let (queue_sender, _) = async_channel::bounded(2);
2731 let mut peers = HashMap::new();
2732 peers.insert(PeerId::random(), make_peer());
2733 StatementHandler {
2734 protocol_name: "/statement/1".into(),
2735 notification_service: Box::new(TestNotificationService::new()),
2736 propagate_timeout: (Box::pin(futures::stream::pending())
2737 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2738 .fuse(),
2739 pending_statements: FuturesUnordered::new(),
2740 pending_statements_peers: HashMap::new(),
2741 network,
2742 sync,
2743 sync_event_stream: (Box::pin(futures::stream::pending())
2744 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2745 .fuse(),
2746 peers,
2747 statement_store: Arc::new(TestStatementStore::new()),
2748 queue_sender,
2749 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2750 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2751 metrics: None,
2752 initial_sync_timeout: Box::pin(futures::future::pending()),
2753 pending_initial_syncs: HashMap::new(),
2754 initial_sync_peer_queue: VecDeque::new(),
2755 deferred_peers: HashSet::new(),
2756 dropped_statements_during_sync: dropped,
2757 sync_recovery_peer: None,
2758 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
2759 }
2760 };
2761
2762 let net = TestNetwork::new();
2764 let mut handler = make_handler(net.clone(), false);
2765 handler.start_sync_recovery();
2766 assert!(handler.sync_recovery_peer.is_none());
2767 assert!(net.get_removed_reserved().is_empty());
2768
2769 let net2 = TestNetwork::new();
2771 let mut handler2 = make_handler(net2.clone(), true);
2772 handler2.start_sync_recovery();
2773 assert!(handler2.sync_recovery_peer.is_some());
2774 assert_eq!(net2.get_removed_reserved().len(), 1);
2775 }
2776}