1mod affinity;
30
31use crate::config::*;
32
33use affinity::AffinityFilter;
34use codec::{Compact, Decode, Encode, MaxEncodedLen};
35use futures::{
36 channel::oneshot,
37 future::{pending, FusedFuture},
38 prelude::*,
39 stream::FuturesUnordered,
40};
41use governor::{
42 clock::DefaultClock,
43 state::{InMemoryState, NotKeyed},
44 Quota, RateLimiter,
45};
46use prometheus_endpoint::{
47 exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
48 Registry, U64,
49};
50use rand::seq::IteratorRandom;
51use sc_network::{
52 config::{NonReservedPeerMode, SetConfig},
53 error, multiaddr,
54 peer_store::PeerStoreProvider,
55 service::{
56 traits::{NotificationEvent, NotificationService, ValidationResult},
57 NotificationMetrics,
58 },
59 types::ProtocolName,
60 utils::{interval, LruHashSet},
61 NetworkBackend, NetworkEventStream, NetworkPeers,
62};
63use sc_network_sync::{SyncEvent, SyncEventStream};
64use sc_network_types::PeerId;
65use sp_runtime::traits::Block as BlockT;
66use sp_statement_store::{
67 FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
68};
69use std::{
70 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
71 iter,
72 num::{NonZeroU32, NonZeroUsize},
73 pin::Pin,
74 sync::Arc,
75 time::Instant,
76};
77use tokio::time::timeout;
78pub mod config;
79
80pub type Statements = Vec<Statement>;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85enum PeerProtocolVersion {
86 V1,
88 V2,
90}
91
92impl PeerProtocolVersion {
93 fn envelope_overhead(&self) -> usize {
95 match self {
96 PeerProtocolVersion::V1 => V1_ENVELOPE_OVERHEAD,
97 PeerProtocolVersion::V2 => V2_ENVELOPE_OVERHEAD,
98 }
99 }
100}
101
102#[derive(Debug, Encode, Decode)]
103enum StatementMessage {
104 #[codec(index = 0)]
105 Statements(Vec<Statement>),
106 #[codec(index = 1)]
108 ExplicitTopicAffinity(AffinityFilter),
109}
110
111const STATEMENTS_VARIANT_INDEX: u8 = 0;
113
114impl StatementMessage {
115 fn encode_statement_refs(statements: &[&Statement]) -> Vec<u8> {
118 let mut out = Vec::new();
119 STATEMENTS_VARIANT_INDEX.encode_to(&mut out);
120 statements.encode_to(&mut out);
121 out
122 }
123}
124
125pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
127
128mod rep {
129 use sc_network::ReputationChange as Rep;
130 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
135 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
137 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
139 pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
141 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
143 pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
145 pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad statement message");
147}
148
149const LOG_TARGET: &str = "statement-gossip";
150const STATEMENT_PROTOCOL_V2: &str = "statement/2";
153const STATEMENT_PROTOCOL_V1: &str = "statement/1";
155const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
157const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
159const PENDING_AFFINITIES_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
161const SYNC_RECOVERY_READD_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
163
164struct Metrics {
165 propagated_statements: Counter<U64>,
166 known_statements_received: Counter<U64>,
167 skipped_oversized_statements: Counter<U64>,
168 propagated_statements_chunks: Histogram,
169 pending_statements: Gauge<U64>,
170 ignored_statements: Counter<U64>,
171 peers_connected: Gauge<U64>,
172 statements_received: Counter<U64>,
173 bytes_sent_total: Counter<U64>,
174 bytes_received_total: Counter<U64>,
175 sent_latency_seconds: Histogram,
176 initial_sync_statements_sent: Counter<U64>,
177 initial_sync_bursts_total: Counter<U64>,
178 initial_sync_peers_active: Gauge<U64>,
179 initial_sync_duration_seconds: Histogram,
180 statement_flooding_detected: Counter<U64>,
181}
182
183impl Metrics {
184 fn register(r: &Registry) -> Result<Self, PrometheusError> {
185 Ok(Self {
186 propagated_statements: register(
187 Counter::new(
188 "substrate_sync_propagated_statements",
189 "Total statements propagated to peers, counted once per recipient (a statement sent to N peers increments by N)",
190 )?,
191 r,
192 )?,
193 known_statements_received: register(
194 Counter::new(
195 "substrate_sync_known_statement_received",
196 "Number of statements received via gossiping that were already in the statement store",
197 )?,
198 r,
199 )?,
200 skipped_oversized_statements: register(
201 Counter::new(
202 "substrate_sync_skipped_oversized_statements",
203 "Number of oversized statements that were skipped to be gossiped",
204 )?,
205 r,
206 )?,
207 propagated_statements_chunks: register(
208 Histogram::with_opts(
209 HistogramOpts::new(
210 "substrate_sync_propagated_statements_chunks",
211 "Distribution of chunk sizes when propagating statements",
212 )
213 .buckets(exponential_buckets(1.0, 2.0, 14)?),
214 )?,
215 r,
216 )?,
217 pending_statements: register(
218 Gauge::new(
219 "substrate_sync_pending_statement_validations",
220 "Number of pending statement validations, sampled once per propagation tick",
221 )?,
222 r,
223 )?,
224 ignored_statements: register(
225 Counter::new(
226 "substrate_sync_ignored_statements",
227 "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
228 )?,
229 r,
230 )?,
231 peers_connected: register(
232 Gauge::new(
233 "substrate_sync_statement_peers_connected",
234 "Number of peers connected using the statement protocol",
235 )?,
236 r,
237 )?,
238 statements_received: register(
239 Counter::new(
240 "substrate_sync_statements_received",
241 "Total number of statements received from peers",
242 )?,
243 r,
244 )?,
245 bytes_sent_total: register(
246 Counter::new(
247 "substrate_sync_statement_bytes_sent_total",
248 "Total bytes sent for statement protocol messages",
249 )?,
250 r,
251 )?,
252 bytes_received_total: register(
253 Counter::new(
254 "substrate_sync_statement_bytes_received_total",
255 "Total bytes received for statement protocol messages (includes bytes from notifications that are later discarded — e.g. while major-syncing)",
256 )?,
257 r,
258 )?,
259 sent_latency_seconds: register(
260 Histogram::with_opts(
261 HistogramOpts::new(
262 "substrate_sync_statement_sent_latency_seconds",
263 "Time to send statement messages to peers",
264 )
265 .buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
267 )?,
268 r,
269 )?,
270 initial_sync_statements_sent: register(
271 Counter::new(
272 "substrate_sync_initial_sync_statements_sent",
273 "Total statements sent during initial sync bursts to newly connected peers",
274 )?,
275 r,
276 )?,
277 initial_sync_bursts_total: register(
278 Counter::new(
279 "substrate_sync_initial_sync_bursts_total",
280 "Total initial-sync burst rounds attempted (includes rounds that return early with no hashes left)",
281 )?,
282 r,
283 )?,
284 initial_sync_peers_active: register(
285 Gauge::new(
286 "substrate_sync_initial_sync_peers_active",
287 "Number of peers currently being synced via initial sync",
288 )?,
289 r,
290 )?,
291 initial_sync_duration_seconds: register(
292 Histogram::with_opts(
293 HistogramOpts::new(
294 "substrate_sync_initial_sync_duration_seconds",
295 "Per-peer duration of initial sync from start until completion or peer disconnect (whichever comes first)",
296 )
297 .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
298 )?,
299 r,
300 )?,
301 statement_flooding_detected: register(
302 Counter::new(
303 "substrate_sync_statement_flooding_detected",
304 "Number of peers disconnected for exceeding statement rate limits",
305 )?,
306 r,
307 )?,
308 })
309 }
310}
311
312pub struct StatementHandlerPrototype {
314 protocol_name: ProtocolName,
315 notification_service: Box<dyn NotificationService>,
316}
317
318impl StatementHandlerPrototype {
319 pub fn new<
321 Hash: AsRef<[u8]>,
322 Block: BlockT,
323 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
324 >(
325 genesis_hash: Hash,
326 fork_id: Option<&str>,
327 metrics: NotificationMetrics,
328 peer_store_handle: Arc<dyn PeerStoreProvider>,
329 ) -> (Self, Net::NotificationProtocolConfig) {
330 let genesis_hash = genesis_hash.as_ref();
331 let hex = array_bytes::bytes2hex("", genesis_hash);
332 let (protocol_name, fallback_name) = if let Some(fork_id) = fork_id {
333 (
334 format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V2}"),
335 format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V1}"),
336 )
337 } else {
338 (format!("/{hex}/{STATEMENT_PROTOCOL_V2}"), format!("/{hex}/{STATEMENT_PROTOCOL_V1}"))
339 };
340 let (config, notification_service) = Net::notification_config(
341 protocol_name.clone().into(),
342 vec![fallback_name.into()],
343 MAX_STATEMENT_NOTIFICATION_SIZE,
344 None,
345 SetConfig {
346 in_peers: 0,
347 out_peers: 0,
348 reserved_nodes: Vec::new(),
349 non_reserved_mode: NonReservedPeerMode::Deny,
350 },
351 metrics,
352 peer_store_handle,
353 );
354
355 (Self { protocol_name: protocol_name.into(), notification_service }, config)
356 }
357
358 pub fn build<
363 N: NetworkPeers + NetworkEventStream,
364 S: SyncEventStream + sp_consensus::SyncOracle,
365 >(
366 self,
367 network: N,
368 sync: S,
369 statement_store: Arc<dyn StatementStore>,
370 metrics_registry: Option<&Registry>,
371 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
372 mut num_submission_workers: usize,
373 statements_per_second: u32,
374 ) -> error::Result<StatementHandler<N, S>> {
375 let sync_event_stream = sync.event_stream("statement-handler-sync");
376 let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
377
378 if num_submission_workers == 0 {
379 log::warn!(
380 target: LOG_TARGET,
381 "num_submission_workers is 0, defaulting to 1"
382 );
383 num_submission_workers = 1;
384 }
385
386 let statements_per_second = match NonZeroU32::new(statements_per_second) {
387 Some(rate) => rate,
388 None => {
389 log::warn!(
390 target: LOG_TARGET,
391 "statements_per_second is 0, defaulting to {}",
392 DEFAULT_STATEMENTS_PER_SECOND
393 );
394 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
395 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
396 },
397 };
398
399 let metrics =
400 if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
401
402 for _ in 0..num_submission_workers {
403 let store = statement_store.clone();
404 let mut queue_receiver = queue_receiver.clone();
405 executor(
406 async move {
407 loop {
408 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
409 queue_receiver.next().await;
410 match task {
411 None => return,
412 Some((statement, completion)) => {
413 let result = store.submit(statement, StatementSource::Network);
414 if completion.send(result).is_err() {
415 log::debug!(
416 target: LOG_TARGET,
417 "Error sending validation completion"
418 );
419 }
420 },
421 }
422 }
423 }
424 .boxed(),
425 );
426 }
427
428 let handler = StatementHandler {
429 protocol_name: self.protocol_name,
430 notification_service: self.notification_service,
431 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
432 as Pin<Box<dyn Stream<Item = ()> + Send>>)
433 .fuse(),
434 pending_statements: FuturesUnordered::new(),
435 pending_statements_peers: HashMap::new(),
436 network,
437 sync,
438 sync_event_stream: sync_event_stream.fuse(),
439 peers: HashMap::new(),
440 statement_store,
441 queue_sender,
442 statements_per_second,
443 metrics,
444 initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
445 pending_affinities_timeout: Box::pin(
446 tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse(),
447 ),
448 pending_initial_syncs: HashMap::new(),
449 initial_sync_peer_queue: VecDeque::new(),
450 deferred_peers: HashSet::new(),
451 dropped_statements_during_sync: false,
452 sync_recovery_peer: None,
453 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
454 };
455
456 Ok(handler)
457 }
458}
459
460pub struct StatementHandler<
462 N: NetworkPeers + NetworkEventStream,
463 S: SyncEventStream + sp_consensus::SyncOracle,
464> {
465 protocol_name: ProtocolName,
466 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
468 pending_statements:
470 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
471 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
476 network: N,
478 sync: S,
480 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
482 notification_service: Box<dyn NotificationService>,
484 peers: HashMap<PeerId, Peer>,
486 statement_store: Arc<dyn StatementStore>,
487 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
488 statements_per_second: NonZeroU32,
490 metrics: Option<Metrics>,
492 initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
494 pending_affinities_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
496 pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
498 initial_sync_peer_queue: VecDeque<PeerId>,
500 deferred_peers: HashSet<PeerId>,
503 dropped_statements_during_sync: bool,
505 sync_recovery_peer: Option<PeerId>,
507 sync_recovery_readd_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
509}
510
511#[derive(Debug)]
516struct PeerRateLimiter {
517 limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
518}
519
520impl PeerRateLimiter {
521 fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
522 let quota = Quota::per_second(statements_per_second).allow_burst(burst);
523 Self { limiter: RateLimiter::direct(quota) }
524 }
525
526 fn is_flooding(&self, count: usize) -> bool {
528 if count > u32::MAX as usize {
529 return true;
530 }
531
532 let Some(n) = NonZeroU32::new(count as u32) else {
533 return false;
534 };
535 !matches!(self.limiter.check_n(n), Ok(Ok(())))
536 }
537}
538
539#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
541#[derive(Debug)]
542pub struct Peer {
543 known_statements: LruHashSet<Hash>,
545 rate_limiter: PeerRateLimiter,
547 protocol_version: PeerProtocolVersion,
549 topic_affinity: Option<AffinityFilter>,
552 is_light: bool,
555 pending_topic_affinity: Option<AffinityFilter>,
559}
560
561struct PendingInitialSync {
563 hashes: Vec<Hash>,
564 started_at: Instant,
565}
566
567enum ChunkResult {
569 Send(usize),
571 SkipOversized,
573}
574
575enum SendChunkResult {
577 Sent(usize),
579 Skipped,
581 Empty,
583 Failed,
585}
586
587const V1_ENVELOPE_OVERHEAD: usize = 5;
589
590const V2_ENVELOPE_OVERHEAD: usize = 1 + V1_ENVELOPE_OVERHEAD;
592
593fn max_statement_payload_size(envelope_overhead: usize) -> usize {
596 debug_assert_eq!(
597 V1_ENVELOPE_OVERHEAD,
598 Compact::<u32>::max_encoded_len(),
599 "V1_ENVELOPE_OVERHEAD must equal Compact::<u32>::max_encoded_len()"
600 );
601 MAX_STATEMENT_NOTIFICATION_SIZE as usize - envelope_overhead
602}
603
604fn find_sendable_chunk(statements: &[&Statement], envelope_overhead: usize) -> ChunkResult {
611 if statements.is_empty() {
612 return ChunkResult::Send(0);
613 }
614 let max_size = max_statement_payload_size(envelope_overhead);
615
616 let mut accumulated_size = 0;
621 let mut count = 0usize;
622
623 for stmt in &statements[0..] {
624 let stmt_size = stmt.encoded_size();
625 let new_count = count + 1;
626 let new_total = accumulated_size + stmt_size;
628 if new_total > max_size {
629 break;
630 }
631
632 accumulated_size += stmt_size;
633 count = new_count;
634 }
635
636 if count == 0 {
638 ChunkResult::SkipOversized
639 } else {
640 ChunkResult::Send(count)
641 }
642}
643
644impl Peer {
645 #[cfg(any(test, feature = "test-helpers"))]
647 pub fn new_for_testing(
648 known_statements: LruHashSet<Hash>,
649 statements_per_second: NonZeroU32,
650 burst: NonZeroU32,
651 ) -> Self {
652 Self {
653 known_statements,
654 rate_limiter: PeerRateLimiter::new(statements_per_second, burst),
655 protocol_version: PeerProtocolVersion::V1,
656 topic_affinity: None,
657 is_light: false,
658 pending_topic_affinity: None,
659 }
660 }
661
662 fn can_receive(&self) -> bool {
666 !(self.is_light &&
667 self.protocol_version == PeerProtocolVersion::V2 &&
668 self.topic_affinity.is_none())
669 }
670}
671
672impl<N, S> StatementHandler<N, S>
673where
674 N: NetworkPeers + NetworkEventStream,
675 S: SyncEventStream + sp_consensus::SyncOracle,
676{
677 #[cfg(any(test, feature = "test-helpers"))]
679 pub fn new_for_testing(
680 protocol_name: ProtocolName,
681 notification_service: Box<dyn NotificationService>,
682 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
683 network: N,
684 sync: S,
685 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
686 peers: HashMap<PeerId, Peer>,
687 statement_store: Arc<dyn StatementStore>,
688 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
689 statements_per_second: NonZeroU32,
690 ) -> Self {
691 Self {
692 protocol_name,
693 notification_service,
694 propagate_timeout,
695 pending_statements: FuturesUnordered::new(),
696 pending_statements_peers: HashMap::new(),
697 network,
698 sync,
699 sync_event_stream,
700 peers,
701 statement_store,
702 queue_sender,
703 statements_per_second,
704 metrics: None,
705 initial_sync_timeout: Box::pin(pending().fuse()),
706 pending_affinities_timeout: Box::pin(pending().fuse()),
707 pending_initial_syncs: HashMap::new(),
708 initial_sync_peer_queue: VecDeque::new(),
709 deferred_peers: HashSet::new(),
710 dropped_statements_during_sync: false,
711 sync_recovery_peer: None,
712 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
713 }
714 }
715
716 #[cfg(any(test, feature = "test-helpers"))]
718 pub fn pending_statements_mut(
719 &mut self,
720 ) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
721 {
722 &mut self.pending_statements
723 }
724
725 pub async fn run(mut self) {
728 loop {
729 futures::select_biased! {
730 _ = self.propagate_timeout.next() => {
731 self.propagate_statements().await;
732 self.metrics.as_ref().map(|metrics| {
733 metrics.pending_statements.set(self.pending_statements.len() as u64);
734 });
735 },
736 (hash, result) = self.pending_statements.select_next_some() => {
737 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
738 if let Some(result) = result {
739 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
740 }
741 } else {
742 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
743 }
744 },
745 sync_event = self.sync_event_stream.next() => {
746 if let Some(sync_event) = sync_event {
747 self.handle_sync_event(sync_event);
748 } else {
749 return;
751 }
752 }
753 event = self.notification_service.next_event().fuse() => {
754 if let Some(event) = event {
755 self.handle_notification_event(event).await
756 } else {
757 return
759 }
760 }
761 _ = &mut self.initial_sync_timeout => {
762 self.process_initial_sync_burst().await;
763 self.initial_sync_timeout =
764 Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
765 },
766 _ = &mut self.pending_affinities_timeout => {
767 self.process_pending_affinities();
768 self.pending_affinities_timeout =
769 Box::pin(tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse());
770 },
771 _ = &mut self.sync_recovery_readd_timeout => {
772 self.try_readd_sync_recovery_peer();
773 self.sync_recovery_readd_timeout = Box::pin(pending().fuse());
774 },
775 }
776
777 if !self.sync.is_major_syncing() {
778 self.drain_deferred_peers();
779 self.start_sync_recovery();
780 }
781 }
782 }
783
784 async fn send_statement_chunk(
790 &mut self,
791 peer: &PeerId,
792 statements: &[&Statement],
793 ) -> SendChunkResult {
794 let Some(peer_data) = self.peers.get(peer) else {
795 log::error!(target: LOG_TARGET, "Peer {peer} not found in peers map during send_statement_chunk");
796 return SendChunkResult::Failed;
797 };
798 let peer_version = peer_data.protocol_version;
799 let envelope_overhead = peer_version.envelope_overhead();
800 match find_sendable_chunk(statements, envelope_overhead) {
801 ChunkResult::Send(0) => SendChunkResult::Empty,
802 ChunkResult::Send(chunk_end) => {
803 let chunk = &statements[..chunk_end];
804 let encoded = match peer_version {
805 PeerProtocolVersion::V1 => chunk.encode(),
806 PeerProtocolVersion::V2 => StatementMessage::encode_statement_refs(chunk),
807 };
808 let bytes_to_send = encoded.len() as u64;
809
810 let sent_latency_timer =
811 self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
812 let send_result = timeout(
813 SEND_TIMEOUT,
814 self.notification_service.send_async_notification(peer, encoded),
815 )
816 .await;
817 drop(sent_latency_timer);
818
819 if let Err(e) = send_result {
820 log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
821 return SendChunkResult::Failed;
822 }
823
824 log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
825 self.metrics.as_ref().map(|metrics| {
826 metrics.propagated_statements.inc_by(chunk.len() as u64);
827 metrics.bytes_sent_total.inc_by(bytes_to_send);
828 metrics.propagated_statements_chunks.observe(chunk.len() as f64);
829 });
830 SendChunkResult::Sent(chunk_end)
831 },
832 ChunkResult::SkipOversized => {
833 log::warn!(target: LOG_TARGET, "Statement too large, skipping");
834 self.metrics.as_ref().map(|metrics| {
835 metrics.skipped_oversized_statements.inc();
836 });
837 SendChunkResult::Skipped
838 },
839 }
840 }
841
842 fn drain_deferred_peers(&mut self) {
844 if self.deferred_peers.is_empty() {
845 return;
846 }
847
848 log::debug!(
849 target: LOG_TARGET,
850 "Major sync complete, adding {} deferred statement peers",
851 self.deferred_peers.len(),
852 );
853
854 let addrs: HashSet<multiaddr::Multiaddr> = self
855 .deferred_peers
856 .drain()
857 .map(|p| {
858 iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
859 })
860 .collect();
861
862 if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
863 {
864 log::warn!(target: LOG_TARGET, "Failed to add deferred peers: {err}");
865 }
866 }
867
868 fn start_sync_recovery(&mut self) {
873 if !self.dropped_statements_during_sync {
874 return;
875 }
876 self.dropped_statements_during_sync = false;
877
878 if self.sync_recovery_peer.is_some() {
879 return;
880 }
881
882 let Some(&peer_id) = self.peers.keys().choose(&mut rand::thread_rng()) else {
883 return;
884 };
885
886 log::trace!(
887 target: LOG_TARGET,
888 "Major sync complete, force-reconnecting {peer_id} for statement recovery",
889 );
890
891 if let Err(err) = self.network.remove_peers_from_reserved_set(
892 self.protocol_name.clone(),
893 iter::once(peer_id).collect(),
894 ) {
895 log::warn!(target: LOG_TARGET, "Failed to remove peer {peer_id} for sync recovery: {err}");
896 return;
897 }
898
899 self.sync_recovery_peer = Some(peer_id);
900 self.sync_recovery_readd_timeout =
901 Box::pin(tokio::time::sleep(SYNC_RECOVERY_READD_DELAY).fuse());
902 }
903
904 fn try_readd_sync_recovery_peer(&mut self) {
906 let Some(peer_id) = self.sync_recovery_peer.take() else { return };
907 log::trace!(
908 target: LOG_TARGET,
909 "Re-adding {peer_id} to reserved set after sync recovery window",
910 );
911 let addr =
912 iter::once(multiaddr::Protocol::P2p(peer_id.into())).collect::<multiaddr::Multiaddr>();
913 if let Err(err) = self
914 .network
915 .add_peers_to_reserved_set(self.protocol_name.clone(), iter::once(addr).collect())
916 {
917 log::warn!(target: LOG_TARGET, "Failed to re-add sync recovery peer {peer_id}: {err}");
918 }
919 }
920
921 fn handle_sync_event(&mut self, event: SyncEvent) {
922 match event {
923 SyncEvent::PeerConnected(remote) => {
924 if self.sync.is_major_syncing() {
925 log::trace!(
926 target: LOG_TARGET,
927 "Major sync in progress, deferring connection to {remote}",
928 );
929 self.deferred_peers.insert(remote);
930 return;
931 }
932 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
933 .collect::<multiaddr::Multiaddr>();
934 let result = self.network.add_peers_to_reserved_set(
935 self.protocol_name.clone(),
936 iter::once(addr).collect(),
937 );
938 if let Err(err) = result {
939 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
940 }
941 },
942 SyncEvent::PeerDisconnected(remote) => {
943 if self.deferred_peers.remove(&remote) {
944 return;
945 }
946 let result = self.network.remove_peers_from_reserved_set(
947 self.protocol_name.clone(),
948 iter::once(remote).collect(),
949 );
950 if let Err(err) = result {
951 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
952 }
953 },
954 }
955 }
956
957 async fn handle_notification_event(&mut self, event: NotificationEvent) {
958 match event {
959 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
960 let result = self
962 .network
963 .peer_role(peer, handshake)
964 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
965 let _ = result_tx.send(result);
966 },
967 NotificationEvent::NotificationStreamOpened {
968 peer,
969 negotiated_fallback,
970 handshake,
971 ..
972 } => {
973 let protocol_version = if negotiated_fallback.is_some() {
976 PeerProtocolVersion::V1
977 } else {
978 PeerProtocolVersion::V2
979 };
980 let Some(peer_role) = self.network.peer_role(peer, handshake) else {
981 log::debug!(
982 target: LOG_TARGET,
983 "Peer {peer} connected but role could not be determined, ignoring"
984 );
985 return;
986 };
987 let is_light = peer_role.is_light();
988 log::debug!(
989 target: LOG_TARGET,
990 "Peer {peer} connected with statement protocol {protocol_version:?}, role={peer_role:?}"
991 );
992 let _was_in = self.peers.insert(
993 peer,
994 Peer {
995 known_statements: LruHashSet::new(
996 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
997 ),
998 rate_limiter: PeerRateLimiter::new(
999 self.statements_per_second,
1000 NonZeroU32::new(
1001 self.statements_per_second.get() *
1002 config::STATEMENTS_BURST_COEFFICIENT,
1003 )
1004 .expect("burst capacity is nonzero"),
1005 ),
1006 protocol_version,
1007 topic_affinity: None,
1008 is_light,
1009 pending_topic_affinity: None,
1010 },
1011 );
1012 debug_assert!(_was_in.is_none());
1013
1014 self.metrics.as_ref().map(|metrics| {
1015 metrics.peers_connected.set(self.peers.len() as u64);
1016 });
1017
1018 if self.peers.get(&peer).map_or(false, |p| p.can_receive()) {
1021 self.schedule_initial_sync_for_peer(peer);
1022 }
1023 },
1024 NotificationEvent::NotificationStreamClosed { peer } => {
1025 let _peer = self.peers.remove(&peer);
1026 debug_assert!(_peer.is_some());
1027 if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
1028 self.metrics.as_ref().map(|metrics| {
1029 metrics.initial_sync_peers_active.dec();
1030 metrics
1031 .initial_sync_duration_seconds
1032 .observe(pending.started_at.elapsed().as_secs_f64());
1033 });
1034 }
1035 self.initial_sync_peer_queue.retain(|p| *p != peer);
1036 self.metrics.as_ref().map(|metrics| {
1037 metrics.peers_connected.set(self.peers.len() as u64);
1038 });
1039 },
1040 NotificationEvent::NotificationReceived { peer, notification } => {
1041 let bytes_received = notification.len() as u64;
1042 self.metrics.as_ref().map(|metrics| {
1043 metrics.bytes_received_total.inc_by(bytes_received);
1044 });
1045
1046 if self.sync.is_major_syncing() {
1048 log::trace!(
1049 target: LOG_TARGET,
1050 "{peer}: Ignoring statements while major syncing or offline"
1051 );
1052 self.dropped_statements_during_sync = true;
1053 return;
1054 }
1055
1056 let Some(peer_data) = self.peers.get(&peer) else {
1057 log::error!(target: LOG_TARGET, "Received notification from unknown peer {peer}");
1058 return;
1059 };
1060
1061 match peer_data.protocol_version {
1062 PeerProtocolVersion::V1 => {
1063 if let Ok(statements) =
1065 <Statements as Decode>::decode(&mut notification.as_ref())
1066 {
1067 self.on_statements(peer, statements);
1068 } else {
1069 log::debug!(
1070 target: LOG_TARGET,
1071 "Failed to decode v1 statement list from {peer}"
1072 );
1073 self.network.report_peer(peer, rep::BAD_MESSAGE);
1074 }
1075 },
1076 PeerProtocolVersion::V2 => {
1077 if let Ok(message) = StatementMessage::decode(&mut notification.as_ref()) {
1079 match message {
1080 StatementMessage::Statements(statements) => {
1081 self.on_statements(peer, statements)
1082 },
1083 StatementMessage::ExplicitTopicAffinity(filter) => {
1084 if let Some(peer_data) = self.peers.get_mut(&peer) {
1085 if peer_data.rate_limiter.is_flooding(1) {
1086 log::debug!(
1087 target: LOG_TARGET,
1088 "Rate-limiting ExplicitTopicAffinity from {peer}"
1089 );
1090 self.network.report_peer(peer, rep::BAD_MESSAGE);
1091 } else {
1092 log::debug!(
1093 target: LOG_TARGET,
1094 "Received topic affinity filter from {peer}"
1095 );
1096 peer_data.pending_topic_affinity = Some(filter);
1099 }
1100 }
1101 },
1102 }
1103 } else {
1104 log::debug!(
1105 target: LOG_TARGET,
1106 "Failed to decode v2 statement message from {peer}"
1107 );
1108 self.network.report_peer(peer, rep::BAD_MESSAGE);
1109 }
1110 },
1111 }
1112 },
1113 }
1114 }
1115
1116 #[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
1118 pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
1119 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
1120
1121 self.metrics.as_ref().map(|metrics| {
1122 metrics.statements_received.inc_by(statements.len() as u64);
1123 });
1124
1125 if let Some(ref mut peer) = self.peers.get_mut(&who) {
1126 if peer.rate_limiter.is_flooding(statements.len()) {
1127 log::warn!(
1128 target: LOG_TARGET,
1129 "Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
1130 who,
1131 self.statements_per_second
1132 );
1133
1134 self.network.report_peer(who, rep::STATEMENT_FLOODING);
1135
1136 self.network.disconnect_peer(who, self.protocol_name.clone());
1138
1139 if let Some(ref metrics) = self.metrics {
1140 metrics.statement_flooding_detected.inc();
1141 }
1142
1143 return;
1144 }
1145
1146 let mut statements_left = statements.len() as u64;
1147 for s in statements {
1148 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
1149 log::debug!(
1150 target: LOG_TARGET,
1151 "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
1152 statements_left,
1153 MAX_PENDING_STATEMENTS,
1154 );
1155 self.metrics.as_ref().map(|metrics| {
1156 metrics.ignored_statements.inc_by(statements_left);
1157 });
1158 break;
1159 }
1160
1161 let hash = s.hash();
1162 peer.known_statements.insert(hash);
1163
1164 if self.statement_store.has_statement(&hash) {
1165 self.metrics.as_ref().map(|metrics| {
1166 metrics.known_statements_received.inc();
1167 });
1168
1169 if let Some(peers) = self.pending_statements_peers.get(&hash) {
1170 if peers.contains(&who) {
1171 log::trace!(
1172 target: LOG_TARGET,
1173 "Already received the statement from the same peer {who}.",
1174 );
1175 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1176 }
1177 }
1178 continue;
1179 }
1180
1181 self.network.report_peer(who, rep::ANY_STATEMENT);
1182
1183 match self.pending_statements_peers.entry(hash) {
1184 Entry::Vacant(entry) => {
1185 let (completion_sender, completion_receiver) = oneshot::channel();
1186 match self.queue_sender.try_send((s, completion_sender)) {
1187 Ok(()) => {
1188 self.pending_statements.push(
1189 async move {
1190 let res = completion_receiver.await;
1191 (hash, res.ok())
1192 }
1193 .boxed(),
1194 );
1195 entry.insert(HashSet::from_iter([who]));
1196 },
1197 Err(async_channel::TrySendError::Full(_)) => {
1198 log::debug!(
1199 target: LOG_TARGET,
1200 "Dropped statement because validation channel is full",
1201 );
1202 },
1203 Err(async_channel::TrySendError::Closed(_)) => {
1204 log::trace!(
1205 target: LOG_TARGET,
1206 "Dropped statement because validation channel is closed",
1207 );
1208 },
1209 }
1210 },
1211 Entry::Occupied(mut entry) => {
1212 if !entry.get_mut().insert(who) {
1213 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
1215 }
1216 },
1217 }
1218
1219 statements_left -= 1;
1220 }
1221 }
1222 }
1223
1224 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
1225 match import {
1226 SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
1227 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
1228 SubmitResult::KnownExpired => {},
1229 SubmitResult::Rejected(_) => {},
1230 SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
1231 SubmitResult::InternalError(_) => {},
1232 }
1233 }
1234
1235 pub async fn propagate_statement(&mut self, hash: &Hash) {
1237 if self.sync.is_major_syncing() {
1239 return;
1240 }
1241
1242 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
1243 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
1244 self.do_propagate_statements(&[(*hash, statement)]).await;
1245 }
1246 }
1247
1248 async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
1253 let Some(peer) = self.peers.get_mut(who) else {
1254 return;
1255 };
1256
1257 if !peer.can_receive() {
1258 return;
1259 }
1260
1261 let to_send: Vec<_> = statements
1262 .iter()
1263 .filter_map(|(hash, stmt)| {
1264 if peer.known_statements.contains(hash) {
1265 return None;
1266 }
1267 if peer.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
1271 return None;
1272 }
1273 peer.known_statements.insert(*hash);
1274 Some(stmt)
1275 })
1276 .collect();
1277
1278 log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
1279
1280 if to_send.is_empty() {
1281 return;
1282 }
1283
1284 self.send_statements_in_chunks(who, &to_send).await;
1285 }
1286
1287 async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
1289 let mut offset = 0;
1290 while offset < statements.len() {
1291 match self.send_statement_chunk(who, &statements[offset..]).await {
1292 SendChunkResult::Sent(chunk_end) => {
1293 offset += chunk_end;
1294 },
1295 SendChunkResult::Skipped => {
1296 offset += 1;
1297 },
1298 SendChunkResult::Empty | SendChunkResult::Failed => return,
1299 }
1300 }
1301 }
1302
1303 async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
1304 log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
1305 let peers: Vec<_> = self.peers.keys().copied().collect();
1306 for who in peers {
1307 log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
1308 self.send_statements_to_peer(&who, statements).await;
1309 }
1310 log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
1311 }
1312
1313 async fn propagate_statements(&mut self) {
1315 if self.sync.is_major_syncing() {
1317 return;
1318 }
1319
1320 let Ok(statements) = self.statement_store.take_recent_statements() else { return };
1321 if !statements.is_empty() {
1322 self.do_propagate_statements(&statements).await;
1323 }
1324 }
1325
1326 fn schedule_initial_sync_for_peer(&mut self, peer: PeerId) {
1332 if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
1334 self.record_initial_sync_completion(pending.started_at);
1335 self.initial_sync_peer_queue.retain(|p| *p != peer);
1336 }
1337 let hashes = self.statement_store.statement_hashes();
1338 if let Some(peer_data) = self.peers.get_mut(&peer) {
1343 peer_data.known_statements.clear();
1344 }
1345 if !hashes.is_empty() {
1346 self.pending_initial_syncs
1347 .insert(peer, PendingInitialSync { hashes, started_at: Instant::now() });
1348 self.initial_sync_peer_queue.push_back(peer);
1349 self.metrics.as_ref().map(|metrics| {
1350 metrics.initial_sync_peers_active.inc();
1351 });
1352 }
1353 }
1354
1355 fn process_pending_affinities(&mut self) {
1361 let ready_peers: Vec<PeerId> = self
1362 .peers
1363 .iter()
1364 .filter(|(peer_id, peer_data)| {
1365 peer_data.pending_topic_affinity.is_some() &&
1366 !self.pending_initial_syncs.contains_key(peer_id)
1367 })
1368 .map(|(peer_id, _)| *peer_id)
1369 .collect();
1370
1371 for peer_id in ready_peers {
1372 if let Some(peer_data) = self.peers.get_mut(&peer_id) {
1373 peer_data.topic_affinity = peer_data.pending_topic_affinity.take();
1374 }
1375 self.schedule_initial_sync_for_peer(peer_id);
1376 }
1377 }
1378
1379 fn record_initial_sync_completion(&self, started_at: Instant) {
1381 self.metrics.as_ref().map(|metrics| {
1382 metrics.initial_sync_peers_active.dec();
1383 metrics
1384 .initial_sync_duration_seconds
1385 .observe(started_at.elapsed().as_secs_f64());
1386 });
1387 }
1388
1389 async fn process_initial_sync_burst(&mut self) {
1391 if self.sync.is_major_syncing() {
1392 return;
1393 }
1394
1395 let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1396 return;
1397 };
1398
1399 let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1400 return;
1401 };
1402
1403 self.metrics.as_ref().map(|metrics| {
1404 metrics.initial_sync_bursts_total.inc();
1405 });
1406
1407 if entry.get().hashes.is_empty() {
1408 let started_at = entry.get().started_at;
1409 entry.remove();
1410 self.record_initial_sync_completion(started_at);
1411 return;
1412 }
1413
1414 let Some(peer_data) = self.peers.get(&peer_id) else {
1419 log::error!(target: LOG_TARGET, "Peer {peer_id} has pending initial sync but is not in peers map");
1420 entry.remove();
1421 return;
1422 };
1423 let envelope_overhead = peer_data.protocol_version.envelope_overhead();
1424 let max_size = max_statement_payload_size(envelope_overhead);
1425 let mut accumulated_size = 0;
1426 let (statements, processed) = match self.statement_store.statements_by_hashes(
1427 &entry.get().hashes,
1428 &mut |hash, encoded, stmt| {
1429 if peer_data.known_statements.contains(hash) {
1433 return FilterDecision::Skip;
1434 }
1435 if peer_data.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
1436 return FilterDecision::Skip;
1437 }
1438 if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1439 return FilterDecision::Abort;
1440 }
1441 accumulated_size += encoded.len();
1442 FilterDecision::Take
1443 },
1444 ) {
1445 Ok(r) => r,
1446 Err(e) => {
1447 log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1448 let started_at = entry.get().started_at;
1449 entry.remove();
1450 self.record_initial_sync_completion(started_at);
1451 return;
1452 },
1453 };
1454
1455 entry.get_mut().hashes.drain(..processed);
1457 let has_more = !entry.get().hashes.is_empty();
1458 drop(entry);
1459
1460 let send_stmts: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1461 match self.send_statement_chunk(&peer_id, &send_stmts).await {
1462 SendChunkResult::Failed => {
1463 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1464 self.record_initial_sync_completion(pending.started_at);
1465 }
1466 return;
1467 },
1468 SendChunkResult::Sent(sent) => {
1469 debug_assert_eq!(send_stmts.len(), sent);
1470 self.metrics.as_ref().map(|metrics| {
1471 metrics.initial_sync_statements_sent.inc_by(sent as u64);
1472 });
1473 if let Some(peer) = self.peers.get_mut(&peer_id) {
1475 for (hash, _) in &statements {
1476 peer.known_statements.insert(*hash);
1477 }
1478 }
1479 },
1480 SendChunkResult::Empty | SendChunkResult::Skipped => {},
1481 }
1482
1483 if has_more {
1485 self.initial_sync_peer_queue.push_back(peer_id);
1486 } else {
1487 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1488 self.record_initial_sync_completion(pending.started_at);
1489 }
1490 }
1491 }
1492}
1493
1494#[cfg(test)]
1495mod tests {
1496
1497 use super::*;
1498 use std::sync::{
1499 atomic::{AtomicBool, Ordering},
1500 Mutex,
1501 };
1502
1503 const BLOOM_SEED: u128 = 0x5EED_5EED_5EED_5EED;
1505
1506 #[derive(Clone)]
1507 struct TestNetwork {
1508 reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
1509 disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1510 default_role: sc_network::ObservedRole,
1512 added_reserved: Arc<Mutex<Vec<HashSet<sc_network::Multiaddr>>>>,
1513 removed_reserved: Arc<Mutex<Vec<Vec<PeerId>>>>,
1514 }
1515
1516 impl TestNetwork {
1517 fn new() -> Self {
1518 Self {
1519 reported_peers: Arc::new(Mutex::new(Vec::new())),
1520 disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1521 default_role: sc_network::ObservedRole::Full,
1522 added_reserved: Arc::new(Mutex::new(Vec::new())),
1523 removed_reserved: Arc::new(Mutex::new(Vec::new())),
1524 }
1525 }
1526
1527 fn new_light() -> Self {
1528 Self {
1529 reported_peers: Arc::new(Mutex::new(Vec::new())),
1530 disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1531 default_role: sc_network::ObservedRole::Light,
1532 added_reserved: Arc::new(Mutex::new(Vec::new())),
1533 removed_reserved: Arc::new(Mutex::new(Vec::new())),
1534 }
1535 }
1536
1537 fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
1538 self.reported_peers.lock().unwrap().clone()
1539 }
1540
1541 fn get_disconnected_peers(&self) -> Vec<PeerId> {
1542 self.disconnected_peers.lock().unwrap().clone()
1543 }
1544
1545 fn get_added_reserved(&self) -> Vec<HashSet<sc_network::Multiaddr>> {
1546 self.added_reserved.lock().unwrap().clone()
1547 }
1548
1549 fn get_removed_reserved(&self) -> Vec<Vec<PeerId>> {
1550 self.removed_reserved.lock().unwrap().clone()
1551 }
1552 }
1553
1554 #[async_trait::async_trait]
1555 impl NetworkPeers for TestNetwork {
1556 fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1557 unimplemented!()
1558 }
1559
1560 fn set_authorized_only(&self, _: bool) {
1561 unimplemented!()
1562 }
1563
1564 fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
1565 unimplemented!()
1566 }
1567
1568 fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
1569 self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1570 }
1571
1572 fn peer_reputation(&self, _: &PeerId) -> i32 {
1573 unimplemented!()
1574 }
1575
1576 fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
1577 self.disconnected_peers.lock().unwrap().push(peer);
1578 }
1579
1580 fn accept_unreserved_peers(&self) {
1581 unimplemented!()
1582 }
1583
1584 fn deny_unreserved_peers(&self) {
1585 unimplemented!()
1586 }
1587
1588 fn add_reserved_peer(
1589 &self,
1590 _: sc_network::config::MultiaddrWithPeerId,
1591 ) -> Result<(), String> {
1592 unimplemented!()
1593 }
1594
1595 fn remove_reserved_peer(&self, _: PeerId) {
1596 unimplemented!()
1597 }
1598
1599 fn set_reserved_peers(
1600 &self,
1601 _: sc_network::ProtocolName,
1602 _: std::collections::HashSet<sc_network::Multiaddr>,
1603 ) -> Result<(), String> {
1604 unimplemented!()
1605 }
1606
1607 fn add_peers_to_reserved_set(
1608 &self,
1609 _: sc_network::ProtocolName,
1610 addrs: std::collections::HashSet<sc_network::Multiaddr>,
1611 ) -> Result<(), String> {
1612 self.added_reserved.lock().unwrap().push(addrs);
1613 Ok(())
1614 }
1615
1616 fn remove_peers_from_reserved_set(
1617 &self,
1618 _: sc_network::ProtocolName,
1619 peers: Vec<PeerId>,
1620 ) -> Result<(), String> {
1621 self.removed_reserved.lock().unwrap().push(peers);
1622 Ok(())
1623 }
1624
1625 fn sync_num_connected(&self) -> usize {
1626 unimplemented!()
1627 }
1628
1629 fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
1630 Some(self.default_role)
1631 }
1632
1633 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1634 unimplemented!();
1635 }
1636 }
1637
1638 #[derive(Clone)]
1639 struct TestSync {
1640 major_syncing: Arc<AtomicBool>,
1641 }
1642
1643 impl TestSync {
1644 fn new() -> Self {
1645 Self { major_syncing: Arc::new(AtomicBool::new(false)) }
1646 }
1647
1648 fn with_syncing(initial: bool) -> (Self, Arc<AtomicBool>) {
1649 let flag = Arc::new(AtomicBool::new(initial));
1650 (Self { major_syncing: flag.clone() }, flag)
1651 }
1652 }
1653
1654 impl SyncEventStream for TestSync {
1655 fn event_stream(
1656 &self,
1657 _name: &'static str,
1658 ) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
1659 Box::pin(futures::stream::pending())
1660 }
1661 }
1662
1663 impl sp_consensus::SyncOracle for TestSync {
1664 fn is_major_syncing(&self) -> bool {
1665 self.major_syncing.load(Ordering::Relaxed)
1666 }
1667
1668 fn is_offline(&self) -> bool {
1669 unimplemented!()
1670 }
1671 }
1672
1673 impl NetworkEventStream for TestNetwork {
1674 fn event_stream(
1675 &self,
1676 _name: &'static str,
1677 ) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
1678 unimplemented!()
1679 }
1680 }
1681
1682 #[derive(Debug, Clone)]
1683 struct TestNotificationService {
1684 sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1685 }
1686
1687 impl TestNotificationService {
1688 fn new() -> Self {
1689 Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1690 }
1691
1692 fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1693 self.sent_notifications.lock().unwrap().clone()
1694 }
1695
1696 fn clear_sent_notifications(&self) {
1697 self.sent_notifications.lock().unwrap().clear();
1698 }
1699 }
1700
1701 #[async_trait::async_trait]
1702 impl NotificationService for TestNotificationService {
1703 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1704 unimplemented!()
1705 }
1706
1707 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1708 unimplemented!()
1709 }
1710
1711 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1712 self.sent_notifications.lock().unwrap().push((*peer, notification));
1713 }
1714
1715 async fn send_async_notification(
1716 &mut self,
1717 peer: &PeerId,
1718 notification: Vec<u8>,
1719 ) -> Result<(), sc_network::error::Error> {
1720 self.sent_notifications.lock().unwrap().push((*peer, notification));
1721 Ok(())
1722 }
1723
1724 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1725 unimplemented!()
1726 }
1727
1728 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1729 unimplemented!()
1730 }
1731
1732 async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
1733 None
1734 }
1735
1736 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1737 unimplemented!()
1738 }
1739
1740 fn protocol(&self) -> &sc_network::types::ProtocolName {
1741 unimplemented!()
1742 }
1743
1744 fn message_sink(
1745 &self,
1746 _peer: &PeerId,
1747 ) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
1748 unimplemented!()
1749 }
1750 }
1751
1752 #[derive(Clone)]
1753 struct TestStatementStore {
1754 statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1755 recent_statements:
1756 Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
1757 }
1758
1759 impl TestStatementStore {
1760 fn new() -> Self {
1761 Self { statements: Default::default(), recent_statements: Default::default() }
1762 }
1763 }
1764
1765 impl StatementStore for TestStatementStore {
1766 fn statements(
1767 &self,
1768 ) -> sp_statement_store::Result<
1769 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1770 > {
1771 Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1772 }
1773
1774 fn take_recent_statements(
1775 &self,
1776 ) -> sp_statement_store::Result<
1777 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1778 > {
1779 Ok(self.recent_statements.lock().unwrap().drain().collect())
1780 }
1781
1782 fn statement(
1783 &self,
1784 _hash: &sp_statement_store::Hash,
1785 ) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
1786 unimplemented!()
1787 }
1788
1789 fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
1790 self.statements.lock().unwrap().contains_key(hash)
1791 }
1792
1793 fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
1794 self.statements.lock().unwrap().keys().cloned().collect()
1795 }
1796
1797 fn statements_by_hashes(
1798 &self,
1799 hashes: &[sp_statement_store::Hash],
1800 filter: &mut dyn FnMut(
1801 &sp_statement_store::Hash,
1802 &[u8],
1803 &sp_statement_store::Statement,
1804 ) -> FilterDecision,
1805 ) -> sp_statement_store::Result<(
1806 Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
1807 usize,
1808 )> {
1809 let statements = self.statements.lock().unwrap();
1810 let mut result = Vec::new();
1811 let mut processed = 0;
1812 for hash in hashes {
1813 let Some(stmt) = statements.get(hash) else {
1814 processed += 1;
1815 continue;
1816 };
1817 let encoded = stmt.encode();
1818 match filter(hash, &encoded, stmt) {
1819 FilterDecision::Skip => {
1820 processed += 1;
1821 },
1822 FilterDecision::Take => {
1823 processed += 1;
1824 result.push((*hash, stmt.clone()));
1825 },
1826 FilterDecision::Abort => break,
1827 }
1828 }
1829 Ok((result, processed))
1830 }
1831
1832 fn broadcasts(
1833 &self,
1834 _match_all_topics: &[sp_statement_store::Topic],
1835 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1836 unimplemented!()
1837 }
1838
1839 fn posted(
1840 &self,
1841 _match_all_topics: &[sp_statement_store::Topic],
1842 _dest: [u8; 32],
1843 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1844 unimplemented!()
1845 }
1846
1847 fn posted_clear(
1848 &self,
1849 _match_all_topics: &[sp_statement_store::Topic],
1850 _dest: [u8; 32],
1851 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1852 unimplemented!()
1853 }
1854
1855 fn broadcasts_stmt(
1856 &self,
1857 _match_all_topics: &[sp_statement_store::Topic],
1858 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1859 unimplemented!()
1860 }
1861
1862 fn posted_stmt(
1863 &self,
1864 _match_all_topics: &[sp_statement_store::Topic],
1865 _dest: [u8; 32],
1866 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1867 unimplemented!()
1868 }
1869
1870 fn posted_clear_stmt(
1871 &self,
1872 _match_all_topics: &[sp_statement_store::Topic],
1873 _dest: [u8; 32],
1874 ) -> sp_statement_store::Result<Vec<Vec<u8>>> {
1875 unimplemented!()
1876 }
1877
1878 fn submit(
1879 &self,
1880 _statement: sp_statement_store::Statement,
1881 _source: sp_statement_store::StatementSource,
1882 ) -> sp_statement_store::SubmitResult {
1883 unimplemented!()
1884 }
1885
1886 fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
1887 unimplemented!()
1888 }
1889
1890 fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
1891 unimplemented!()
1892 }
1893 }
1894
1895 fn build_handler(
1896 num_peers: usize,
1897 ) -> (
1898 StatementHandler<TestNetwork, TestSync>,
1899 TestStatementStore,
1900 TestNetwork,
1901 TestNotificationService,
1902 async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1903 Vec<PeerId>,
1904 ) {
1905 let statement_store = TestStatementStore::new();
1906 let (queue_sender, queue_receiver) = async_channel::bounded(100);
1907 let network = TestNetwork::new();
1908 let notification_service = TestNotificationService::new();
1909 let mut peers = HashMap::new();
1910 let mut peer_ids = Vec::with_capacity(num_peers);
1911
1912 for _ in 0..num_peers {
1913 let peer_id = PeerId::random();
1914 peer_ids.push(peer_id);
1915 peers.insert(
1916 peer_id,
1917 Peer {
1918 known_statements: LruHashSet::new(NonZeroUsize::new(1000).unwrap()),
1919 rate_limiter: PeerRateLimiter::new(
1920 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1921 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1922 NonZeroU32::new(
1923 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1924 )
1925 .expect("burst capacity is nonzero"),
1926 ),
1927 protocol_version: PeerProtocolVersion::V1,
1928 topic_affinity: None,
1929 is_light: false,
1930 pending_topic_affinity: None,
1931 },
1932 );
1933 }
1934
1935 let handler = StatementHandler {
1936 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
1937 notification_service: Box::new(notification_service.clone()),
1938 propagate_timeout: (Box::pin(futures::stream::pending())
1939 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1940 .fuse(),
1941 pending_statements: FuturesUnordered::new(),
1942 pending_statements_peers: HashMap::new(),
1943 network: network.clone(),
1944 sync: TestSync::new(),
1945 sync_event_stream: (Box::pin(futures::stream::pending())
1946 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
1947 .fuse(),
1948 peers,
1949 statement_store: Arc::new(statement_store.clone()),
1950 queue_sender,
1951 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1952 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1953 metrics: None,
1954 initial_sync_timeout: Box::pin(futures::future::pending()),
1955 pending_affinities_timeout: Box::pin(futures::future::pending()),
1956 pending_initial_syncs: HashMap::new(),
1957 initial_sync_peer_queue: VecDeque::new(),
1958 deferred_peers: HashSet::new(),
1959 dropped_statements_during_sync: false,
1960 sync_recovery_peer: None,
1961 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
1962 };
1963 (handler, statement_store, network, notification_service, queue_receiver, peer_ids)
1964 }
1965
1966 fn get_peer_hashes(sent: &[(PeerId, Vec<u8>)], peer: PeerId) -> Vec<sp_statement_store::Hash> {
1967 sent.iter()
1968 .filter(|(p, _)| *p == peer)
1969 .flat_map(|(_, notification)| {
1970 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1971 })
1972 .map(|s| s.hash())
1973 .collect()
1974 }
1975
1976 async fn dispatch_disconnects(
1979 handler: &mut StatementHandler<TestNetwork, TestSync>,
1980 network: &TestNetwork,
1981 ) {
1982 for peer in network.get_disconnected_peers() {
1983 handler
1984 .handle_notification_event(NotificationEvent::NotificationStreamClosed { peer })
1985 .await;
1986 }
1987 }
1988
1989 #[tokio::test]
1990 async fn test_skips_processing_statements_that_already_in_store() {
1991 let (mut handler, statement_store, _network, _notification_service, queue_receiver, _) =
1992 build_handler(1);
1993
1994 let mut statement1 = Statement::new();
1995 statement1.set_plain_data(b"statement1".to_vec());
1996 let hash1 = statement1.hash();
1997
1998 statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1999
2000 let mut statement2 = Statement::new();
2001 statement2.set_plain_data(b"statement2".to_vec());
2002 let hash2 = statement2.hash();
2003
2004 let peer_id = *handler.peers.keys().next().unwrap();
2005
2006 handler.on_statements(peer_id, vec![statement1, statement2]);
2007
2008 let to_submit = queue_receiver.try_recv();
2009 assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
2010
2011 let no_more = queue_receiver.try_recv();
2012 assert!(no_more.is_err(), "Expected only one statement to be queued");
2013 }
2014
2015 #[tokio::test]
2016 async fn test_reports_for_duplicate_statements() {
2017 let (mut handler, statement_store, network, _notification_service, queue_receiver, _) =
2018 build_handler(1);
2019
2020 let peer_id = *handler.peers.keys().next().unwrap();
2021
2022 let mut statement1 = Statement::new();
2023 statement1.set_plain_data(b"statement1".to_vec());
2024
2025 handler.on_statements(peer_id, vec![statement1.clone()]);
2026 {
2027 let (s, _) = queue_receiver.try_recv().unwrap();
2029 let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
2030 handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
2031 }
2032
2033 handler.on_statements(peer_id, vec![statement1]);
2034
2035 let reports = network.get_reports();
2036 assert_eq!(
2037 reports,
2038 vec![
2039 (peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
2043 "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
2044 reports
2045 );
2046 }
2047
2048 #[tokio::test]
2049 async fn test_splits_large_batches_into_smaller_chunks() {
2050 let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2051 build_handler(1);
2052
2053 let num_statements = 30;
2054 let statement_size = 100 * 1024; for i in 0..num_statements {
2056 let mut statement = Statement::new();
2057 let mut data = vec![0u8; statement_size];
2058 data[0] = i as u8;
2059 statement.set_plain_data(data);
2060 let hash = statement.hash();
2061 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2062 }
2063
2064 handler.propagate_statements().await;
2065
2066 let sent = notification_service.get_sent_notifications();
2067 let mut total_statements_sent = 0;
2068 assert!(
2069 sent.len() == 3,
2070 "Expected batch to be split into 3 chunks, but got {} chunks",
2071 sent.len()
2072 );
2073 for (_peer, notification) in sent.iter() {
2074 assert!(
2075 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2076 "Notification size {} exceeds limit {}",
2077 notification.len(),
2078 MAX_STATEMENT_NOTIFICATION_SIZE
2079 );
2080 if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
2081 total_statements_sent += stmts.len();
2082 }
2083 }
2084
2085 assert_eq!(
2086 total_statements_sent, num_statements,
2087 "Expected all {} statements to be sent, but only {} were sent",
2088 num_statements, total_statements_sent
2089 );
2090 }
2091
2092 #[tokio::test]
2093 async fn test_skips_only_oversized_statements() {
2094 let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2095 build_handler(1);
2096
2097 let mut statement1 = Statement::new();
2098 statement1.set_plain_data(vec![1u8; 100]);
2099 let hash1 = statement1.hash();
2100 statement_store
2101 .recent_statements
2102 .lock()
2103 .unwrap()
2104 .insert(hash1, statement1.clone());
2105
2106 let mut oversized1 = Statement::new();
2107 oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
2108 let hash_oversized1 = oversized1.hash();
2109 statement_store
2110 .recent_statements
2111 .lock()
2112 .unwrap()
2113 .insert(hash_oversized1, oversized1);
2114
2115 let mut statement2 = Statement::new();
2116 statement2.set_plain_data(vec![3u8; 100]);
2117 let hash2 = statement2.hash();
2118 statement_store
2119 .recent_statements
2120 .lock()
2121 .unwrap()
2122 .insert(hash2, statement2.clone());
2123
2124 let mut oversized2 = Statement::new();
2125 oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
2126 let hash_oversized2 = oversized2.hash();
2127 statement_store
2128 .recent_statements
2129 .lock()
2130 .unwrap()
2131 .insert(hash_oversized2, oversized2);
2132
2133 let mut statement3 = Statement::new();
2134 statement3.set_plain_data(vec![5u8; 100]);
2135 let hash3 = statement3.hash();
2136 statement_store
2137 .recent_statements
2138 .lock()
2139 .unwrap()
2140 .insert(hash3, statement3.clone());
2141
2142 handler.propagate_statements().await;
2143
2144 let sent = notification_service.get_sent_notifications();
2145
2146 let mut sent_hashes = sent
2147 .iter()
2148 .flat_map(|(_peer, notification)| {
2149 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2150 })
2151 .map(|s| s.hash())
2152 .collect::<Vec<_>>();
2153 sent_hashes.sort();
2154 let mut expected_hashes = vec![hash1, hash2, hash3];
2155 expected_hashes.sort();
2156 assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
2157 }
2158
2159 fn build_handler_no_peers() -> (
2160 StatementHandler<TestNetwork, TestSync>,
2161 TestStatementStore,
2162 TestNetwork,
2163 TestNotificationService,
2164 ) {
2165 let statement_store = TestStatementStore::new();
2166 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
2167 let network = TestNetwork::new();
2168 let notification_service = TestNotificationService::new();
2169
2170 let handler = StatementHandler {
2171 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
2172 notification_service: Box::new(notification_service.clone()),
2173 propagate_timeout: (Box::pin(futures::stream::pending())
2174 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2175 .fuse(),
2176 pending_statements: FuturesUnordered::new(),
2177 pending_statements_peers: HashMap::new(),
2178 network: network.clone(),
2179 sync: TestSync::new(),
2180 sync_event_stream: (Box::pin(futures::stream::pending())
2181 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2182 .fuse(),
2183 peers: HashMap::new(),
2184 statement_store: Arc::new(statement_store.clone()),
2185 queue_sender,
2186 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2187 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2188 metrics: None,
2189 initial_sync_timeout: Box::pin(futures::future::pending()),
2190 pending_affinities_timeout: Box::pin(futures::future::pending()),
2191 pending_initial_syncs: HashMap::new(),
2192 initial_sync_peer_queue: VecDeque::new(),
2193 deferred_peers: HashSet::new(),
2194 dropped_statements_during_sync: false,
2195 sync_recovery_peer: None,
2196 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2197 };
2198 (handler, statement_store, network, notification_service)
2199 }
2200
2201 fn build_handler_no_peers_light() -> (
2203 StatementHandler<TestNetwork, TestSync>,
2204 TestStatementStore,
2205 TestNetwork,
2206 TestNotificationService,
2207 ) {
2208 let statement_store = TestStatementStore::new();
2209 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
2210 let network = TestNetwork::new_light();
2211 let notification_service = TestNotificationService::new();
2212
2213 let handler = StatementHandler {
2214 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
2215 notification_service: Box::new(notification_service.clone()),
2216 propagate_timeout: (Box::pin(futures::stream::pending())
2217 as Pin<Box<dyn Stream<Item = ()> + Send>>)
2218 .fuse(),
2219 pending_statements: FuturesUnordered::new(),
2220 pending_statements_peers: HashMap::new(),
2221 network: network.clone(),
2222 sync: TestSync::new(),
2223 sync_event_stream: (Box::pin(futures::stream::pending())
2224 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
2225 .fuse(),
2226 peers: HashMap::new(),
2227 statement_store: Arc::new(statement_store.clone()),
2228 queue_sender,
2229 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
2230 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
2231 metrics: None,
2232 initial_sync_timeout: Box::pin(futures::future::pending()),
2233 pending_affinities_timeout: Box::pin(futures::future::pending()),
2234 pending_initial_syncs: HashMap::new(),
2235 initial_sync_peer_queue: VecDeque::new(),
2236 deferred_peers: HashSet::new(),
2237 dropped_statements_during_sync: false,
2238 sync_recovery_peer: None,
2239 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
2240 };
2241 (handler, statement_store, network, notification_service)
2242 }
2243
2244 #[tokio::test]
2245 async fn test_initial_sync_burst_single_peer() {
2246 let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2247
2248 let num_statements = 200;
2251 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
2253 for i in 0..num_statements {
2254 let mut statement = Statement::new();
2255 let mut data = vec![0u8; statement_size];
2256 data[0] = (i % 256) as u8;
2258 data[1] = (i / 256) as u8;
2259 statement.set_plain_data(data);
2260 let hash = statement.hash();
2261 expected_hashes.push(hash);
2262 statement_store.statements.lock().unwrap().insert(hash, statement);
2263 }
2264
2265 let peer_id = PeerId::random();
2267
2268 handler
2269 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2270 peer: peer_id,
2271 direction: sc_network::service::traits::Direction::Inbound,
2272 handshake: vec![],
2273 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2274 })
2275 .await;
2276
2277 assert!(handler.peers.contains_key(&peer_id));
2279 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2280 assert_eq!(handler.initial_sync_peer_queue.len(), 1);
2281
2282 let mut burst_count = 0;
2284 while handler.pending_initial_syncs.contains_key(&peer_id) {
2285 handler.process_initial_sync_burst().await;
2286 burst_count += 1;
2287 assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
2289 }
2290
2291 assert!(
2294 burst_count >= 10,
2295 "Expected multiple bursts for 200 statements of 100KB each, got {}",
2296 burst_count
2297 );
2298
2299 let sent = notification_service.get_sent_notifications();
2301 let mut sent_hashes: Vec<_> = sent
2302 .iter()
2303 .flat_map(|(peer, notification)| {
2304 assert_eq!(*peer, peer_id);
2305 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2306 })
2307 .map(|s| s.hash())
2308 .collect();
2309 sent_hashes.sort();
2310 expected_hashes.sort();
2311
2312 assert_eq!(
2313 sent_hashes.len(),
2314 expected_hashes.len(),
2315 "Expected {} statements to be sent, got {}",
2316 expected_hashes.len(),
2317 sent_hashes.len()
2318 );
2319 assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
2320
2321 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2323 assert!(handler.initial_sync_peer_queue.is_empty());
2324 }
2325
2326 #[tokio::test]
2327 async fn test_initial_sync_burst_multiple_peers_round_robin() {
2328 let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2329
2330 let num_statements = 200;
2332 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
2334 for i in 0..num_statements {
2335 let mut statement = Statement::new();
2336 let mut data = vec![0u8; statement_size];
2337 data[0] = (i % 256) as u8;
2338 data[1] = (i / 256) as u8;
2339 statement.set_plain_data(data);
2340 let hash = statement.hash();
2341 expected_hashes.push(hash);
2342 statement_store.statements.lock().unwrap().insert(hash, statement);
2343 }
2344
2345 let peer1 = PeerId::random();
2347 let peer2 = PeerId::random();
2348 let peer3 = PeerId::random();
2349
2350 for peer in [peer1, peer2, peer3] {
2352 handler
2353 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2354 peer,
2355 direction: sc_network::service::traits::Direction::Inbound,
2356 handshake: vec![],
2357 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2358 })
2359 .await;
2360 }
2361
2362 assert_eq!(handler.peers.len(), 3);
2364 assert_eq!(handler.pending_initial_syncs.len(), 3);
2365 assert_eq!(handler.initial_sync_peer_queue.len(), 3);
2366
2367 let mut peer_burst_order = Vec::new();
2369 let mut burst_count = 0;
2370
2371 while !handler.pending_initial_syncs.is_empty() {
2372 if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
2374 peer_burst_order.push(next_peer);
2375 }
2376 handler.process_initial_sync_burst().await;
2377 burst_count += 1;
2378 assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
2380 }
2381
2382 assert!(
2385 burst_count >= 30,
2386 "Expected many bursts for 3 peers with 200 statements each, got {}",
2387 burst_count
2388 );
2389
2390 assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
2392 assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
2394 assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
2395 assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
2396 assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
2398 assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
2399 assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
2400
2401 let sent = notification_service.get_sent_notifications();
2403 let mut peer1_hashes = get_peer_hashes(&sent, peer1);
2404 let mut peer2_hashes = get_peer_hashes(&sent, peer2);
2405 let mut peer3_hashes = get_peer_hashes(&sent, peer3);
2406
2407 peer1_hashes.sort();
2408 peer2_hashes.sort();
2409 peer3_hashes.sort();
2410 expected_hashes.sort();
2411
2412 assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
2413 assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
2414 assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
2415
2416 assert!(handler.pending_initial_syncs.is_empty());
2418 assert!(handler.initial_sync_peer_queue.is_empty());
2419 }
2420
2421 #[tokio::test]
2422 async fn test_send_statements_in_chunks_exact_max_size() {
2423 let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
2424 build_handler(1);
2425
2426 let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
2440 let num_statements: usize = 100;
2441 let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
2443 let total_data_size = max_size - total_overhead;
2444 let per_statement_data_size = total_data_size / num_statements;
2445 let remainder = total_data_size % num_statements;
2446
2447 let mut expected_hashes = Vec::with_capacity(num_statements);
2448 let mut total_encoded_size = 0;
2449
2450 for i in 0..num_statements {
2451 let mut statement = Statement::new();
2452 let extra = if i < remainder { 1 } else { 0 };
2454 let mut data = vec![42u8; per_statement_data_size + extra];
2455 data[0] = i as u8;
2457 data[1] = (i >> 8) as u8;
2458 statement.set_plain_data(data);
2459
2460 total_encoded_size += statement.encoded_size();
2461
2462 let hash = statement.hash();
2463 expected_hashes.push(hash);
2464 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
2465 }
2466
2467 assert!(
2469 total_encoded_size == max_size,
2470 "Total encoded size {} should be <= max_size {}",
2471 total_encoded_size,
2472 max_size
2473 );
2474
2475 handler.propagate_statements().await;
2476
2477 let sent = notification_service.get_sent_notifications();
2478
2479 assert_eq!(
2481 sent.len(),
2482 1,
2483 "Expected 1 notification for all {} statements, but got {}",
2484 num_statements,
2485 sent.len()
2486 );
2487
2488 let (_peer, notification) = &sent[0];
2489 assert!(
2490 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
2491 "Notification size {} exceeds limit {}",
2492 notification.len(),
2493 MAX_STATEMENT_NOTIFICATION_SIZE
2494 );
2495
2496 let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
2497 assert_eq!(
2498 decoded.len(),
2499 num_statements,
2500 "Expected {} statements in the notification",
2501 num_statements
2502 );
2503
2504 let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
2506 expected_hashes.sort();
2507 received_hashes.sort();
2508 assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
2509 }
2510
2511 #[tokio::test]
2512 async fn test_initial_sync_burst_size_limit_consistency() {
2513 let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
2524
2525 let payload_limit = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
2527
2528 let first_stmt_data_size = payload_limit / 2 + 10;
2530 let mut stmt1 = Statement::new();
2531 stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2532 let stmt1_encoded_size = stmt1.encoded_size();
2533
2534 let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2537 let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
2540 stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2541 let stmt2_encoded_size = stmt2.encoded_size();
2542
2543 let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2544
2545 assert!(
2547 total_encoded > payload_limit,
2548 "Total {} should exceed payload_limit {} so filter rejects second statement",
2549 total_encoded,
2550 payload_limit
2551 );
2552
2553 let hash1 = stmt1.hash();
2554 let hash2 = stmt2.hash();
2555 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2556 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2557
2558 let peer_id = PeerId::random();
2560
2561 handler
2562 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2563 peer: peer_id,
2564 direction: sc_network::service::traits::Direction::Inbound,
2565 handshake: vec![],
2566 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2567 })
2568 .await;
2569
2570 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2572 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2573
2574 handler.process_initial_sync_burst().await;
2576
2577 let sent = notification_service.get_sent_notifications();
2580 assert_eq!(sent.len(), 1, "First burst should send one notification");
2581
2582 let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2583 assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2584
2585 let sent_hash = decoded[0].hash();
2587 assert!(
2588 sent_hash == hash1 || sent_hash == hash2,
2589 "Sent statement should be one of the two created"
2590 );
2591
2592 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2594 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2595
2596 handler.process_initial_sync_burst().await;
2598
2599 let sent = notification_service.get_sent_notifications();
2600 assert_eq!(sent.len(), 2, "Second burst should send another notification");
2601
2602 let mut sent_hashes: Vec<_> = sent
2604 .iter()
2605 .flat_map(|(_, notification)| {
2606 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2607 })
2608 .map(|s| s.hash())
2609 .collect();
2610 sent_hashes.sort();
2611 let mut expected_hashes = vec![hash1, hash2];
2612 expected_hashes.sort();
2613 assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2614
2615 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2617 }
2618
2619 #[tokio::test]
2620 async fn test_peer_disconnected_on_flooding() {
2621 let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2622 build_handler(1);
2623
2624 let peer_id = *handler.peers.keys().next().unwrap();
2625
2626 let mut flood_statements = Vec::new();
2627 for i in 0..600_000 {
2628 let mut statement = Statement::new();
2629 statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2630 flood_statements.push(statement);
2631 }
2632
2633 handler.on_statements(peer_id, flood_statements);
2634
2635 let reports = network.get_reports();
2636 assert!(
2637 reports
2638 .iter()
2639 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2640 "Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2641 reports
2642 );
2643
2644 let disconnected = network.get_disconnected_peers();
2645 assert!(
2646 disconnected.contains(&peer_id),
2647 "Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2648 peer_id,
2649 disconnected
2650 );
2651
2652 dispatch_disconnects(&mut handler, &network).await;
2653
2654 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2656 assert!(
2657 !handler.pending_initial_syncs.contains_key(&peer_id),
2658 "Peer should be removed from pending_initial_syncs"
2659 );
2660 assert!(
2661 !handler.initial_sync_peer_queue.contains(&peer_id),
2662 "Peer should be removed from initial_sync_peer_queue"
2663 );
2664 }
2665
2666 #[tokio::test]
2667 async fn test_legitimate_traffic_not_flagged() {
2668 let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2669 build_handler(1);
2670
2671 let peer_id = *handler.peers.keys().next().unwrap();
2672
2673 let start = std::time::Instant::now();
2674 let duration = std::time::Duration::from_secs(5);
2675 let mut counter = 0u32;
2676
2677 while start.elapsed() < duration {
2678 let mut statements = Vec::new();
2679 for i in 0..5_000 {
2680 let mut statement = Statement::new();
2681 statement.set_plain_data(vec![
2682 counter as u8,
2683 (counter >> 8) as u8,
2684 (counter >> 16) as u8,
2685 i as u8,
2686 ]);
2687 statements.push(statement);
2688 counter = counter.wrapping_add(1);
2689 }
2690
2691 handler.on_statements(peer_id, statements);
2692
2693 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2694 }
2695
2696 let reports = network.get_reports();
2697 assert!(
2698 !reports
2699 .iter()
2700 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2701 "Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2702 reports
2703 );
2704
2705 let disconnected = network.get_disconnected_peers();
2706 assert!(
2707 !disconnected.contains(&peer_id),
2708 "Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2709 disconnected
2710 );
2711
2712 assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2713 }
2714
2715 #[tokio::test]
2716 async fn test_just_over_rate_limit_triggers_flooding() {
2717 let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2718 build_handler(1);
2719
2720 let peer_id = *handler.peers.keys().next().unwrap();
2721
2722 let mut statements = Vec::new();
2723 for i in 0..260_000 {
2724 let mut statement = Statement::new();
2725 statement.set_plain_data(vec![
2726 i as u8,
2727 (i >> 8) as u8,
2728 (i >> 16) as u8,
2729 (i >> 24) as u8,
2730 ]);
2731 statements.push(statement);
2732 }
2733
2734 handler.on_statements(peer_id, statements);
2735
2736 let reports = network.get_reports();
2737 let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2738 assert!(
2739 reports
2740 .iter()
2741 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2742 "Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2743 expected_burst,
2744 reports
2745 );
2746
2747 let disconnected = network.get_disconnected_peers();
2748 assert!(
2749 disconnected.contains(&peer_id),
2750 "Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2751 disconnected
2752 );
2753
2754 dispatch_disconnects(&mut handler, &network).await;
2755
2756 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2757 }
2758
2759 #[tokio::test]
2760 async fn test_burst_of_250k_statements_allowed() {
2761 let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2762 build_handler(1);
2763
2764 let peer_id = *handler.peers.keys().next().unwrap();
2765
2766 let mut statements = Vec::new();
2767 for i in 0..250_000 {
2768 let mut statement = Statement::new();
2769 statement.set_plain_data(vec![
2770 i as u8,
2771 (i >> 8) as u8,
2772 (i >> 16) as u8,
2773 (i >> 24) as u8,
2774 ]);
2775 statements.push(statement);
2776 }
2777
2778 handler.on_statements(peer_id, statements);
2779
2780 let reports = network.get_reports();
2781 assert!(
2782 !reports
2783 .iter()
2784 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2785 "250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2786 reports
2787 );
2788
2789 assert!(
2790 handler.peers.contains_key(&peer_id),
2791 "Peer should still be connected after 250k burst"
2792 );
2793 }
2794
2795 #[tokio::test]
2796 async fn test_sustained_rate_above_limit_triggers_flooding() {
2797 let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
2798 build_handler(1);
2799
2800 let peer_id = *handler.peers.keys().next().unwrap();
2801
2802 let mut counter = 0u32;
2803
2804 let start = std::time::Instant::now();
2805 let duration = std::time::Duration::from_secs(5);
2806
2807 let mut flooding_detected = false;
2808 while start.elapsed() < duration {
2809 let mut statements = Vec::new();
2810 for i in 0..30_000 {
2811 let mut statement = Statement::new();
2812 statement.set_plain_data(vec![
2813 counter as u8,
2814 (counter >> 8) as u8,
2815 (counter >> 16) as u8,
2816 i as u8,
2817 ]);
2818 statements.push(statement);
2819 counter = counter.wrapping_add(1);
2820 }
2821
2822 handler.on_statements(peer_id, statements);
2823
2824 let reports = network.get_reports();
2826 if reports
2827 .iter()
2828 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2829 {
2830 flooding_detected = true;
2831 break;
2832 }
2833
2834 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2835 }
2836
2837 assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2838
2839 let disconnected = network.get_disconnected_peers();
2840 assert!(
2841 disconnected.contains(&peer_id),
2842 "Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2843 disconnected
2844 );
2845
2846 dispatch_disconnects(&mut handler, &network).await;
2847
2848 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2849 }
2850
2851 #[tokio::test]
2852 async fn test_v2_peer_detected_when_no_fallback() {
2853 let (mut handler, _statement_store, _network, _notification_service) =
2854 build_handler_no_peers();
2855
2856 let peer_id = PeerId::random();
2857
2858 handler
2860 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2861 peer: peer_id,
2862 direction: sc_network::service::traits::Direction::Inbound,
2863 handshake: vec![],
2864 negotiated_fallback: None,
2865 })
2866 .await;
2867
2868 assert_eq!(
2869 handler.peers.get(&peer_id).unwrap().protocol_version,
2870 PeerProtocolVersion::V2,
2871 "Peer should be detected as v2 when no fallback is negotiated"
2872 );
2873 }
2874
2875 #[tokio::test]
2876 async fn test_v1_peer_detected_when_fallback_negotiated() {
2877 let (mut handler, _statement_store, _network, _notification_service) =
2878 build_handler_no_peers();
2879
2880 let peer_id = PeerId::random();
2881
2882 handler
2884 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2885 peer: peer_id,
2886 direction: sc_network::service::traits::Direction::Inbound,
2887 handshake: vec![],
2888 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2889 })
2890 .await;
2891
2892 assert_eq!(
2893 handler.peers.get(&peer_id).unwrap().protocol_version,
2894 PeerProtocolVersion::V1,
2895 "Peer should be detected as v1 when fallback is negotiated"
2896 );
2897 }
2898
2899 #[tokio::test]
2900 async fn test_v1_peer_decodes_raw_statements() {
2901 let (mut handler, _statement_store, _network, _notification_service) =
2902 build_handler_no_peers();
2903
2904 let peer_id = PeerId::random();
2905 let (queue_sender, queue_receiver) = async_channel::bounded(10);
2906 handler.queue_sender = queue_sender;
2907
2908 handler
2910 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2911 peer: peer_id,
2912 direction: sc_network::service::traits::Direction::Inbound,
2913 handshake: vec![],
2914 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
2915 })
2916 .await;
2917
2918 let mut statement = Statement::new();
2920 statement.set_plain_data(b"v1 statement".to_vec());
2921 let hash = statement.hash();
2922 let raw_encoded = vec![statement].encode();
2923
2924 handler
2925 .handle_notification_event(NotificationEvent::NotificationReceived {
2926 peer: peer_id,
2927 notification: raw_encoded.into(),
2928 })
2929 .await;
2930
2931 let (received, _) = queue_receiver.try_recv().unwrap();
2932 assert_eq!(received.hash(), hash, "V1 peer's raw statement should be decoded correctly");
2933 }
2934
2935 #[tokio::test]
2936 async fn test_v2_peer_decodes_statement_message() {
2937 let (mut handler, _statement_store, _network, _notification_service) =
2938 build_handler_no_peers();
2939
2940 let peer_id = PeerId::random();
2941 let (queue_sender, queue_receiver) = async_channel::bounded(10);
2942 handler.queue_sender = queue_sender;
2943
2944 handler
2946 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2947 peer: peer_id,
2948 direction: sc_network::service::traits::Direction::Inbound,
2949 handshake: vec![],
2950 negotiated_fallback: None,
2951 })
2952 .await;
2953
2954 let mut statement = Statement::new();
2956 statement.set_plain_data(b"v2 statement".to_vec());
2957 let hash = statement.hash();
2958 let msg = StatementMessage::Statements(vec![statement]);
2959 let encoded = msg.encode();
2960
2961 handler
2962 .handle_notification_event(NotificationEvent::NotificationReceived {
2963 peer: peer_id,
2964 notification: encoded.into(),
2965 })
2966 .await;
2967
2968 let (received, _) = queue_receiver.try_recv().unwrap();
2969 assert_eq!(received.hash(), hash, "V2 peer's StatementMessage should be decoded correctly");
2970 }
2971
2972 #[tokio::test]
2973 async fn test_v2_peer_topic_affinity_stored() {
2974 let (mut handler, _statement_store, _network, _notification_service) =
2975 build_handler_no_peers();
2976
2977 let peer_id = PeerId::random();
2978
2979 handler
2981 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2982 peer: peer_id,
2983 direction: sc_network::service::traits::Direction::Inbound,
2984 handshake: vec![],
2985 negotiated_fallback: None,
2986 })
2987 .await;
2988
2989 assert!(
2990 handler.peers.get(&peer_id).unwrap().topic_affinity.is_none(),
2991 "Topic affinity should be None initially"
2992 );
2993
2994 let topic: [u8; 32] = [0xAA; 32];
2996 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
2997 filter.insert(&topic);
2998 let msg = StatementMessage::ExplicitTopicAffinity(filter);
2999 let encoded = msg.encode();
3000
3001 handler
3002 .handle_notification_event(NotificationEvent::NotificationReceived {
3003 peer: peer_id,
3004 notification: encoded.into(),
3005 })
3006 .await;
3007
3008 handler.process_pending_affinities();
3010
3011 let peer_data = handler.peers.get(&peer_id).unwrap();
3012 assert!(
3013 peer_data.topic_affinity.is_some(),
3014 "Topic affinity should be set after receiving ExplicitTopicAffinity"
3015 );
3016 assert!(
3018 peer_data.topic_affinity.as_ref().unwrap().contains(&topic),
3019 "Stored affinity filter should match the topic"
3020 );
3021 }
3022
3023 #[tokio::test]
3024 async fn test_topic_affinity_filters_propagation() {
3025 let (mut handler, statement_store, _network, notification_service) =
3026 build_handler_no_peers();
3027
3028 let peer_id = PeerId::random();
3029
3030 handler
3032 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3033 peer: peer_id,
3034 direction: sc_network::service::traits::Direction::Inbound,
3035 handshake: vec![],
3036 negotiated_fallback: None,
3037 })
3038 .await;
3039
3040 let topic_aa: [u8; 32] = [0xAA; 32];
3042 let topic_bb: [u8; 32] = [0xBB; 32];
3043 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3044 filter.insert(&topic_aa);
3045 let msg = StatementMessage::ExplicitTopicAffinity(filter);
3046 let encoded = msg.encode();
3047 handler
3048 .handle_notification_event(NotificationEvent::NotificationReceived {
3049 peer: peer_id,
3050 notification: encoded.into(),
3051 })
3052 .await;
3053
3054 handler.process_pending_affinities();
3056
3057 let mut stmt_matching = Statement::new();
3059 stmt_matching.set_plain_data(b"matching".to_vec());
3060 stmt_matching.set_topic(0, topic_aa.into());
3061 let hash_matching = stmt_matching.hash();
3062
3063 let mut stmt_not_matching = Statement::new();
3064 stmt_not_matching.set_plain_data(b"not matching".to_vec());
3065 stmt_not_matching.set_topic(0, topic_bb.into());
3066 let hash_not_matching = stmt_not_matching.hash();
3067
3068 let mut stmt_no_topic = Statement::new();
3069 stmt_no_topic.set_plain_data(b"no topic".to_vec());
3070 let hash_no_topic = stmt_no_topic.hash();
3071
3072 statement_store
3073 .recent_statements
3074 .lock()
3075 .unwrap()
3076 .insert(hash_matching, stmt_matching);
3077 statement_store
3078 .recent_statements
3079 .lock()
3080 .unwrap()
3081 .insert(hash_not_matching, stmt_not_matching);
3082 statement_store
3083 .recent_statements
3084 .lock()
3085 .unwrap()
3086 .insert(hash_no_topic, stmt_no_topic);
3087
3088 handler.propagate_statements().await;
3089
3090 let sent = notification_service.get_sent_notifications();
3091 let mut sent_hashes: Vec<_> = sent
3092 .iter()
3093 .flat_map(|(_, notification)| {
3094 match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3096 StatementMessage::Statements(stmts) => stmts,
3097 _ => panic!("Expected StatementMessage::Statements"),
3098 }
3099 })
3100 .map(|s| s.hash())
3101 .collect();
3102 sent_hashes.sort();
3103
3104 assert!(
3106 sent_hashes.contains(&hash_matching),
3107 "Statement matching topic affinity should be propagated"
3108 );
3109 assert!(
3110 sent_hashes.contains(&hash_no_topic),
3111 "Statement with no topics should be propagated (broadcast)"
3112 );
3113 assert!(
3114 !sent_hashes.contains(&hash_not_matching),
3115 "Statement NOT matching topic affinity should be filtered out"
3116 );
3117 }
3118
3119 #[tokio::test]
3120 async fn test_v1_peer_no_topic_filtering() {
3121 let (mut handler, statement_store, _network, notification_service) =
3122 build_handler_no_peers();
3123
3124 let peer_id = PeerId::random();
3125
3126 handler
3128 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3129 peer: peer_id,
3130 direction: sc_network::service::traits::Direction::Inbound,
3131 handshake: vec![],
3132 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3133 })
3134 .await;
3135
3136 let topic_aa: [u8; 32] = [0xAA; 32];
3138 let mut stmt_with_topic = Statement::new();
3139 stmt_with_topic.set_plain_data(b"with topic".to_vec());
3140 stmt_with_topic.set_topic(0, topic_aa.into());
3141 let hash_with_topic = stmt_with_topic.hash();
3142
3143 let mut stmt_no_topic = Statement::new();
3144 stmt_no_topic.set_plain_data(b"no topic".to_vec());
3145 let hash_no_topic = stmt_no_topic.hash();
3146
3147 statement_store
3148 .recent_statements
3149 .lock()
3150 .unwrap()
3151 .insert(hash_with_topic, stmt_with_topic);
3152 statement_store
3153 .recent_statements
3154 .lock()
3155 .unwrap()
3156 .insert(hash_no_topic, stmt_no_topic);
3157
3158 handler.propagate_statements().await;
3159
3160 let sent = notification_service.get_sent_notifications();
3161 let sent_hashes: Vec<_> = sent
3162 .iter()
3163 .flat_map(|(_, notification)| {
3164 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
3165 })
3166 .map(|s| s.hash())
3167 .collect();
3168
3169 assert_eq!(
3170 sent_hashes.len(),
3171 2,
3172 "V1 peer should receive all statements regardless of topics"
3173 );
3174 assert!(sent_hashes.contains(&hash_with_topic));
3175 assert!(sent_hashes.contains(&hash_no_topic));
3176 }
3177
3178 #[tokio::test]
3179 async fn test_affinity_change_triggers_resync() {
3180 let (mut handler, statement_store, _network, notification_service) =
3181 build_handler_no_peers_light();
3182
3183 let peer_id = PeerId::random();
3184
3185 let topic_aa: [u8; 32] = [0xAA; 32];
3187 let topic_bb: [u8; 32] = [0xBB; 32];
3188
3189 let mut stmt_aa = Statement::new();
3190 stmt_aa.set_plain_data(b"stmt_aa".to_vec());
3191 stmt_aa.set_topic(0, topic_aa.into());
3192 let hash_aa = stmt_aa.hash();
3193
3194 let mut stmt_bb = Statement::new();
3195 stmt_bb.set_plain_data(b"stmt_bb".to_vec());
3196 stmt_bb.set_topic(0, topic_bb.into());
3197 let hash_bb = stmt_bb.hash();
3198
3199 let mut stmt_no_topic = Statement::new();
3200 stmt_no_topic.set_plain_data(b"no topic".to_vec());
3201 let hash_no_topic = stmt_no_topic.hash();
3202
3203 statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa);
3204 statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb);
3205 statement_store.statements.lock().unwrap().insert(hash_no_topic, stmt_no_topic);
3206
3207 handler
3209 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3210 peer: peer_id,
3211 direction: sc_network::service::traits::Direction::Inbound,
3212 handshake: vec![],
3213 negotiated_fallback: None,
3214 })
3215 .await;
3216
3217 assert!(
3219 !handler.pending_initial_syncs.contains_key(&peer_id),
3220 "Light V2 peer should NOT have initial sync scheduled on connect"
3221 );
3222
3223 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3225 filter.insert(&topic_aa);
3226 let msg = StatementMessage::ExplicitTopicAffinity(filter);
3227 let encoded = msg.encode();
3228 handler
3229 .handle_notification_event(NotificationEvent::NotificationReceived {
3230 peer: peer_id,
3231 notification: encoded.into(),
3232 })
3233 .await;
3234
3235 handler.process_pending_affinities();
3237
3238 assert!(
3239 handler.pending_initial_syncs.contains_key(&peer_id),
3240 "Initial sync should be scheduled after setting affinity"
3241 );
3242
3243 while handler.pending_initial_syncs.contains_key(&peer_id) {
3245 handler.process_initial_sync_burst().await;
3246 }
3247
3248 let sent = notification_service.get_sent_notifications();
3249 let sent_hashes: HashSet<_> = sent
3250 .iter()
3251 .flat_map(|(_, notification)| {
3252 match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3253 StatementMessage::Statements(stmts) => stmts,
3254 _ => panic!("Expected StatementMessage::Statements"),
3255 }
3256 })
3257 .map(|s| s.hash())
3258 .collect();
3259 assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
3260 assert!(
3261 sent_hashes.contains(&hash_no_topic),
3262 "stmt_no_topic should be sent (broadcast, no topic)"
3263 );
3264 assert!(!sent_hashes.contains(&hash_bb), "stmt_bb should NOT be sent (filtered)");
3265
3266 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3268 filter.insert(&topic_bb);
3269 let msg = StatementMessage::ExplicitTopicAffinity(filter);
3270 let encoded = msg.encode();
3271 handler
3272 .handle_notification_event(NotificationEvent::NotificationReceived {
3273 peer: peer_id,
3274 notification: encoded.into(),
3275 })
3276 .await;
3277
3278 handler.process_pending_affinities();
3280
3281 assert!(
3282 handler.pending_initial_syncs.contains_key(&peer_id),
3283 "Initial sync should be re-scheduled after affinity change"
3284 );
3285
3286 notification_service.clear_sent_notifications();
3287 while handler.pending_initial_syncs.contains_key(&peer_id) {
3288 handler.process_initial_sync_burst().await;
3289 }
3290
3291 let sent_after_bb = notification_service.get_sent_notifications();
3292 let sent_hashes_bb: HashSet<_> = sent_after_bb
3293 .iter()
3294 .flat_map(|(_, notification)| {
3295 match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3296 StatementMessage::Statements(stmts) => stmts,
3297 _ => panic!("Expected StatementMessage::Statements"),
3298 }
3299 })
3300 .map(|s| s.hash())
3301 .collect();
3302 assert!(
3304 sent_hashes_bb.contains(&hash_bb),
3305 "stmt_bb should now be sent after affinity changed to topic_bb"
3306 );
3307 assert!(
3309 sent_hashes_bb.contains(&hash_no_topic),
3310 "stmt_no_topic should be re-sent (known_statements cleared on affinity change)"
3311 );
3312 }
3313
3314 #[tokio::test]
3315 async fn test_affinity_change_sends_previously_filtered_statements() {
3316 let (mut handler, statement_store, _network, notification_service) =
3321 build_handler_no_peers_light();
3322
3323 let peer_id = PeerId::random();
3324
3325 let topic_aa: [u8; 32] = [0xAA; 32];
3326 let topic_bb: [u8; 32] = [0xBB; 32];
3327
3328 let mut stmt_aa = Statement::new();
3329 stmt_aa.set_plain_data(b"stmt_aa".to_vec());
3330 stmt_aa.set_topic(0, topic_aa.into());
3331 let hash_aa = stmt_aa.hash();
3332
3333 let mut stmt_bb = Statement::new();
3334 stmt_bb.set_plain_data(b"stmt_bb".to_vec());
3335 stmt_bb.set_topic(0, topic_bb.into());
3336 let hash_bb = stmt_bb.hash();
3337
3338 statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa.clone());
3339 statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb.clone());
3340
3341 statement_store.recent_statements.lock().unwrap().insert(hash_aa, stmt_aa);
3343 statement_store.recent_statements.lock().unwrap().insert(hash_bb, stmt_bb);
3344
3345 handler
3347 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3348 peer: peer_id,
3349 direction: sc_network::service::traits::Direction::Inbound,
3350 handshake: vec![],
3351 negotiated_fallback: None,
3352 })
3353 .await;
3354
3355 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3357 filter.insert(&topic_aa);
3358 let msg = StatementMessage::ExplicitTopicAffinity(filter);
3359 let encoded = msg.encode();
3360 handler
3361 .handle_notification_event(NotificationEvent::NotificationReceived {
3362 peer: peer_id,
3363 notification: encoded.into(),
3364 })
3365 .await;
3366
3367 handler.process_pending_affinities();
3369
3370 while handler.pending_initial_syncs.contains_key(&peer_id) {
3372 handler.process_initial_sync_burst().await;
3373 }
3374
3375 let sent = notification_service.get_sent_notifications();
3376 let sent_hashes: HashSet<_> = sent
3377 .iter()
3378 .flat_map(|(_, notification)| {
3379 match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3380 StatementMessage::Statements(stmts) => stmts,
3381 _ => panic!("Expected StatementMessage::Statements"),
3382 }
3383 })
3384 .map(|s| s.hash())
3385 .collect();
3386 assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
3387 assert!(
3388 !sent_hashes.contains(&hash_bb),
3389 "stmt_bb should NOT be sent (filtered by affinity)"
3390 );
3391
3392 handler.propagate_statements().await;
3395
3396 let peer = handler.peers.get(&peer_id).unwrap();
3398 assert!(
3399 !peer.known_statements.contains(&hash_bb),
3400 "stmt_bb should NOT be in known_statements (filtered by affinity)"
3401 );
3402 assert!(peer.known_statements.contains(&hash_aa), "stmt_aa should be in known_statements");
3403
3404 let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
3406 filter.insert(&topic_aa);
3407 filter.insert(&topic_bb);
3408 let msg = StatementMessage::ExplicitTopicAffinity(filter);
3409 let encoded = msg.encode();
3410
3411 notification_service.clear_sent_notifications();
3412 handler
3413 .handle_notification_event(NotificationEvent::NotificationReceived {
3414 peer: peer_id,
3415 notification: encoded.into(),
3416 })
3417 .await;
3418
3419 handler.process_pending_affinities();
3421
3422 while handler.pending_initial_syncs.contains_key(&peer_id) {
3424 handler.process_initial_sync_burst().await;
3425 }
3426
3427 let sent = notification_service.get_sent_notifications();
3428 let sent_hashes: HashSet<_> = sent
3429 .iter()
3430 .flat_map(|(_, notification)| {
3431 match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
3432 StatementMessage::Statements(stmts) => stmts,
3433 _ => panic!("Expected StatementMessage::Statements"),
3434 }
3435 })
3436 .map(|s| s.hash())
3437 .collect();
3438 assert!(
3439 sent_hashes.contains(&hash_bb),
3440 "stmt_bb should now be sent after affinity expanded to include topic_bb"
3441 );
3442 assert!(
3444 sent_hashes.contains(&hash_aa),
3445 "stmt_aa should be re-sent (known_statements cleared on affinity change)"
3446 );
3447 }
3448
3449 #[test]
3450 fn test_encode_statement_refs_matches_derive_encoding() {
3451 let mut stmt1 = Statement::new();
3452 stmt1.set_plain_data(b"first".to_vec());
3453 let mut stmt2 = Statement::new();
3454 stmt2.set_plain_data(b"second".to_vec());
3455
3456 let refs: Vec<&Statement> = vec![&stmt1, &stmt2];
3457 let hand_rolled = StatementMessage::encode_statement_refs(&refs);
3458 let derive_encoded = StatementMessage::Statements(vec![stmt1, stmt2]).encode();
3459
3460 assert_eq!(
3461 hand_rolled, derive_encoded,
3462 "encode_statement_refs must produce identical bytes to derive Encode"
3463 );
3464 }
3465
3466 #[test]
3467 fn test_encode_statement_refs_empty() {
3468 let refs: Vec<&Statement> = vec![];
3469 let hand_rolled = StatementMessage::encode_statement_refs(&refs);
3470 let derive_encoded = StatementMessage::Statements(vec![]).encode();
3471
3472 assert_eq!(hand_rolled, derive_encoded);
3473 }
3474
3475 #[test]
3476 fn test_can_receive_all_combinations() {
3477 let make_peer = |is_light: bool, version: PeerProtocolVersion, has_affinity: bool| {
3478 let topic_affinity = has_affinity.then(|| AffinityFilter::new(BLOOM_SEED, 0.01, 10));
3479 Peer {
3480 known_statements: LruHashSet::new(NonZeroUsize::new(10).unwrap()),
3481 rate_limiter: PeerRateLimiter::new(
3482 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).expect("nonzero"),
3483 NonZeroU32::new(
3484 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3485 )
3486 .expect("nonzero"),
3487 ),
3488 protocol_version: version,
3489 topic_affinity,
3490 is_light,
3491 pending_topic_affinity: None,
3492 }
3493 };
3494
3495 assert!(make_peer(false, PeerProtocolVersion::V1, false).can_receive());
3497 assert!(make_peer(false, PeerProtocolVersion::V2, false).can_receive());
3499 assert!(make_peer(true, PeerProtocolVersion::V1, false).can_receive());
3501 assert!(!make_peer(true, PeerProtocolVersion::V2, false).can_receive());
3503 assert!(make_peer(true, PeerProtocolVersion::V2, true).can_receive());
3505 assert!(make_peer(false, PeerProtocolVersion::V2, true).can_receive());
3507 }
3508
3509 #[tokio::test]
3510 async fn test_send_chunk_v1_vs_v2_encoding() {
3511 let (mut handler, _statement_store, _network, notification_service) =
3512 build_handler_no_peers();
3513
3514 let v1_peer = PeerId::random();
3515 let v2_peer = PeerId::random();
3516
3517 handler
3519 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3520 peer: v1_peer,
3521 direction: sc_network::service::traits::Direction::Inbound,
3522 handshake: vec![],
3523 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3524 })
3525 .await;
3526
3527 handler
3529 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3530 peer: v2_peer,
3531 direction: sc_network::service::traits::Direction::Inbound,
3532 handshake: vec![],
3533 negotiated_fallback: None,
3534 })
3535 .await;
3536
3537 let mut stmt = Statement::new();
3538 stmt.set_plain_data(b"encoding test".to_vec());
3539
3540 notification_service.clear_sent_notifications();
3542 handler.send_statement_chunk(&v1_peer, &[&stmt]).await;
3543 let v1_sent = notification_service.get_sent_notifications();
3544 assert_eq!(v1_sent.len(), 1);
3545 let v1_bytes = &v1_sent[0].1;
3546 let decoded_v1 = <Statements as Decode>::decode(&mut v1_bytes.as_slice())
3548 .expect("V1 peer should receive raw Vec<Statement> encoding");
3549 assert_eq!(decoded_v1.len(), 1);
3550
3551 notification_service.clear_sent_notifications();
3553 handler.send_statement_chunk(&v2_peer, &[&stmt]).await;
3554 let v2_sent = notification_service.get_sent_notifications();
3555 assert_eq!(v2_sent.len(), 1);
3556 let v2_bytes = &v2_sent[0].1;
3557 let decoded_v2 = StatementMessage::decode(&mut v2_bytes.as_slice())
3559 .expect("V2 peer should receive StatementMessage encoding");
3560 match decoded_v2 {
3561 StatementMessage::Statements(stmts) => assert_eq!(stmts.len(), 1),
3562 _ => panic!("Expected StatementMessage::Statements for V2 peer"),
3563 }
3564
3565 assert_ne!(v1_bytes, v2_bytes, "V1 and V2 encodings should differ");
3567 }
3568
3569 #[tokio::test]
3570 async fn test_schedule_initial_sync_replaces_existing() {
3571 let (mut handler, statement_store, _network, _notification_service) =
3572 build_handler_no_peers();
3573
3574 let peer_id = PeerId::random();
3575
3576 let mut stmt1 = Statement::new();
3578 stmt1.set_plain_data(b"stmt1".to_vec());
3579 let hash1 = stmt1.hash();
3580 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
3581
3582 handler
3584 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3585 peer: peer_id,
3586 direction: sc_network::service::traits::Direction::Inbound,
3587 handshake: vec![],
3588 negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
3589 })
3590 .await;
3591
3592 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
3594 assert_eq!(
3595 handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
3596 1,
3597 "Peer should appear exactly once in the queue"
3598 );
3599
3600 let mut stmt2 = Statement::new();
3602 stmt2.set_plain_data(b"stmt2".to_vec());
3603 let hash2 = stmt2.hash();
3604 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
3605
3606 handler.schedule_initial_sync_for_peer(peer_id);
3607
3608 assert_eq!(
3610 handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
3611 1,
3612 "Peer should NOT be duplicated in the queue after re-schedule"
3613 );
3614 let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
3616 assert!(pending.hashes.contains(&hash1));
3617 assert!(pending.hashes.contains(&hash2));
3618 }
3619
3620 #[tokio::test]
3621 async fn test_initial_sync_queued_during_major_sync_processed_after() {
3622 let statement_store = TestStatementStore::new();
3623 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
3624 let network = TestNetwork::new();
3625 let notification_service = TestNotificationService::new();
3626 let sync = TestSync::new();
3627 sync.major_syncing.store(true, Ordering::Relaxed);
3629
3630 let mut handler = StatementHandler {
3631 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
3632 notification_service: Box::new(notification_service.clone()),
3633 propagate_timeout: (Box::pin(futures::stream::pending())
3634 as Pin<Box<dyn Stream<Item = ()> + Send>>)
3635 .fuse(),
3636 pending_statements: FuturesUnordered::new(),
3637 pending_statements_peers: HashMap::new(),
3638 network: network.clone(),
3639 sync: sync.clone(),
3640 sync_event_stream: (Box::pin(futures::stream::pending())
3641 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
3642 .fuse(),
3643 peers: HashMap::new(),
3644 statement_store: Arc::new(statement_store.clone()),
3645 queue_sender,
3646 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
3647 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
3648 metrics: None,
3649 initial_sync_timeout: Box::pin(futures::future::pending()),
3650 pending_affinities_timeout: Box::pin(futures::future::pending()),
3651 pending_initial_syncs: HashMap::new(),
3652 initial_sync_peer_queue: VecDeque::new(),
3653 deferred_peers: HashSet::new(),
3654 dropped_statements_during_sync: false,
3655 sync_recovery_peer: None,
3656 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
3657 };
3658
3659 let mut stmt = Statement::new();
3661 stmt.set_plain_data(b"during major sync".to_vec());
3662 let hash = stmt.hash();
3663 statement_store.statements.lock().unwrap().insert(hash, stmt);
3664
3665 let peer_id = PeerId::random();
3667 handler.peers.insert(
3668 peer_id,
3669 Peer::new_for_testing(
3670 LruHashSet::new(NonZeroUsize::new(100).unwrap()),
3671 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
3672 NonZeroU32::new(
3673 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3674 )
3675 .unwrap(),
3676 ),
3677 );
3678
3679 handler.schedule_initial_sync_for_peer(peer_id);
3681
3682 assert!(
3683 handler.pending_initial_syncs.contains_key(&peer_id),
3684 "Initial sync should be queued even during major sync"
3685 );
3686 assert_eq!(handler.initial_sync_peer_queue.len(), 1);
3687
3688 handler.process_initial_sync_burst().await;
3690 assert!(
3691 handler.pending_initial_syncs.contains_key(&peer_id),
3692 "Pending sync should remain untouched during major sync"
3693 );
3694
3695 sync.major_syncing.store(false, Ordering::Relaxed);
3697 handler.process_initial_sync_burst().await;
3698 assert!(
3699 handler.initial_sync_peer_queue.is_empty(),
3700 "Peer should have been processed after major sync ended"
3701 );
3702 }
3703
3704 #[tokio::test]
3705 async fn test_schedule_initial_sync_resends_all_matching() {
3706 let (mut handler, statement_store, _network, _notification_service) =
3707 build_handler_no_peers();
3708
3709 let peer_id = PeerId::random();
3710
3711 let mut stmt1 = Statement::new();
3713 stmt1.set_plain_data(b"known".to_vec());
3714 let hash1 = stmt1.hash();
3715 let mut stmt2 = Statement::new();
3716 stmt2.set_plain_data(b"unknown".to_vec());
3717 let hash2 = stmt2.hash();
3718
3719 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
3720 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
3721
3722 let mut known = LruHashSet::new(NonZeroUsize::new(100).unwrap());
3724 known.insert(hash1);
3725 handler.peers.insert(
3726 peer_id,
3727 Peer {
3728 known_statements: known,
3729 rate_limiter: PeerRateLimiter::new(
3730 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
3731 NonZeroU32::new(
3732 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
3733 )
3734 .unwrap(),
3735 ),
3736 protocol_version: PeerProtocolVersion::V1,
3737 topic_affinity: None,
3738 is_light: false,
3739 pending_topic_affinity: None,
3740 },
3741 );
3742
3743 handler.schedule_initial_sync_for_peer(peer_id);
3744
3745 let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
3746 assert!(
3748 pending.hashes.contains(&hash1),
3749 "Previously known hash should be included after affinity change"
3750 );
3751 assert!(pending.hashes.contains(&hash2), "Unknown hash should be included in initial sync");
3752 let peer_data = handler.peers.get(&peer_id).unwrap();
3754 assert!(
3755 !peer_data.known_statements.contains(&hash1),
3756 "known_statements should be cleared after schedule_initial_sync_for_peer"
3757 );
3758 }
3759
3760 #[tokio::test]
3761 async fn test_malformed_v2_message_does_not_panic() {
3762 let (mut handler, _statement_store, _network, _notification_service) =
3763 build_handler_no_peers();
3764
3765 let peer_id = PeerId::random();
3766
3767 handler
3769 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3770 peer: peer_id,
3771 direction: sc_network::service::traits::Direction::Inbound,
3772 handshake: vec![],
3773 negotiated_fallback: None,
3774 })
3775 .await;
3776
3777 handler
3779 .handle_notification_event(NotificationEvent::NotificationReceived {
3780 peer: peer_id,
3781 notification: vec![0xFF, 0xFE, 0xFD].into(),
3782 })
3783 .await;
3784
3785 let mut stmt = Statement::new();
3787 stmt.set_plain_data(b"v1 encoded".to_vec());
3788 let v1_encoded = vec![stmt].encode();
3789 handler
3790 .handle_notification_event(NotificationEvent::NotificationReceived {
3791 peer: peer_id,
3792 notification: v1_encoded.into(),
3793 })
3794 .await;
3795
3796 assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
3798 }
3799
3800 #[test]
3801 fn test_find_sendable_chunk_v2_overhead() {
3802 let v1_max = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
3803 let v2_max = max_statement_payload_size(V2_ENVELOPE_OVERHEAD);
3804
3805 assert!(
3807 v2_max < v1_max,
3808 "V2 payload capacity ({v2_max}) should be less than V1 ({v1_max})"
3809 );
3810 assert_eq!(v1_max - v2_max, 1, "V2 overhead is exactly 1 byte more than V1");
3811
3812 let stmts: Vec<Statement> = (0..1000)
3814 .map(|i| {
3815 let mut s = Statement::new();
3816 s.set_plain_data(format!("stmt-{i}").into_bytes());
3817 s
3818 })
3819 .collect();
3820 let refs: Vec<&Statement> = stmts.iter().collect();
3821
3822 let v1_chunk = find_sendable_chunk(&refs, V1_ENVELOPE_OVERHEAD);
3823 let v2_chunk = find_sendable_chunk(&refs, V2_ENVELOPE_OVERHEAD);
3824
3825 let v1_count = match v1_chunk {
3827 ChunkResult::Send(n) => n,
3828 _ => panic!("Expected Send for V1"),
3829 };
3830 let v2_count = match v2_chunk {
3831 ChunkResult::Send(n) => n,
3832 _ => panic!("Expected Send for V2"),
3833 };
3834 assert!(
3835 v2_count <= v1_count,
3836 "V2 ({v2_count}) should fit at most as many statements as V1 ({v1_count})"
3837 );
3838 }
3839
3840 #[tokio::test]
3841 async fn test_full_node_v2_gets_initial_sync_immediately() {
3842 let (mut handler, statement_store, _network, _notification_service) =
3843 build_handler_no_peers();
3844
3845 let mut stmt = Statement::new();
3847 stmt.set_plain_data(b"full node v2".to_vec());
3848 let hash = stmt.hash();
3849 statement_store.statements.lock().unwrap().insert(hash, stmt);
3850
3851 let peer_id = PeerId::random();
3852
3853 handler
3855 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
3856 peer: peer_id,
3857 direction: sc_network::service::traits::Direction::Inbound,
3858 handshake: vec![],
3859 negotiated_fallback: None,
3860 })
3861 .await;
3862
3863 assert!(
3865 handler.pending_initial_syncs.contains_key(&peer_id),
3866 "Full-node V2 peer should have initial sync scheduled immediately"
3867 );
3868 assert_eq!(handler.peers.get(&peer_id).unwrap().protocol_version, PeerProtocolVersion::V2);
3869 assert!(!handler.peers.get(&peer_id).unwrap().is_light);
3870 }
3871
3872 #[tokio::test]
3873 async fn test_propagation_reaches_all_connected_peers() {
3874 let (
3875 mut handler,
3876 statement_store,
3877 _network,
3878 notification_service,
3879 _queue_receiver,
3880 peer_ids,
3881 ) = build_handler(5);
3882
3883 let mut expected_hashes = Vec::new();
3885 for i in 0..3u8 {
3886 let mut statement = Statement::new();
3887 statement.set_plain_data(vec![i; 100]);
3888 let hash = statement.hash();
3889 expected_hashes.push(hash);
3890 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
3891 }
3892 expected_hashes.sort();
3893
3894 handler.propagate_statements().await;
3895
3896 let sent = notification_service.get_sent_notifications();
3897
3898 for peer_id in &peer_ids {
3900 let mut received_hashes = get_peer_hashes(&sent, *peer_id);
3901 received_hashes.sort();
3902
3903 assert_eq!(
3904 received_hashes, expected_hashes,
3905 "Peer {peer_id} should have received all 3 statements"
3906 );
3907 }
3908
3909 assert!(statement_store.recent_statements.lock().unwrap().is_empty());
3911 }
3912
3913 #[tokio::test]
3914 async fn test_known_statement_filtering_per_peer() {
3915 let (
3916 mut handler,
3917 statement_store,
3918 _network,
3919 notification_service,
3920 _queue_receiver,
3921 peer_ids,
3922 ) = build_handler(3);
3923
3924 let peer_a = peer_ids[0];
3925 let peer_b = peer_ids[1];
3926 let peer_c = peer_ids[2];
3927
3928 let mut hashes = Vec::new();
3930 for i in 0..5u8 {
3931 let mut statement = Statement::new();
3932 statement.set_plain_data(vec![i; 100]);
3933 let hash = statement.hash();
3934 hashes.push(hash);
3935 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
3936 }
3937
3938 handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[0]);
3940 handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[1]);
3941 handler.peers.get_mut(&peer_b).unwrap().known_statements.insert(hashes[2]);
3942
3943 handler.propagate_statements().await;
3944
3945 let sent = notification_service.get_sent_notifications();
3946
3947 let peer_a_hashes = get_peer_hashes(&sent, peer_a);
3948 let peer_b_hashes = get_peer_hashes(&sent, peer_b);
3949 let peer_c_hashes = get_peer_hashes(&sent, peer_c);
3950
3951 assert_eq!(peer_a_hashes.len(), 3, "peer_a should get 3 statements");
3953 assert!(!peer_a_hashes.contains(&hashes[0]), "peer_a already knows s1");
3954 assert!(!peer_a_hashes.contains(&hashes[1]), "peer_a already knows s2");
3955 assert!(peer_a_hashes.contains(&hashes[2]));
3956 assert!(peer_a_hashes.contains(&hashes[3]));
3957 assert!(peer_a_hashes.contains(&hashes[4]));
3958
3959 assert_eq!(peer_b_hashes.len(), 4, "peer_b should get 4 statements");
3961 assert!(!peer_b_hashes.contains(&hashes[2]), "peer_b already knows s3");
3962 assert!(peer_b_hashes.contains(&hashes[0]));
3963 assert!(peer_b_hashes.contains(&hashes[1]));
3964 assert!(peer_b_hashes.contains(&hashes[3]));
3965 assert!(peer_b_hashes.contains(&hashes[4]));
3966
3967 let mut sorted_peer_c: Vec<_> = peer_c_hashes.into_iter().collect();
3969 sorted_peer_c.sort();
3970 let mut all_hashes = hashes.clone();
3971 all_hashes.sort();
3972 assert_eq!(sorted_peer_c, all_hashes, "peer_c should get all 5 statements");
3973 }
3974
3975 #[test]
3978 fn major_sync_defers_peers_and_handles_disconnect() {
3979 let (sync, _flag) = TestSync::with_syncing(true);
3980 let network = TestNetwork::new();
3981 let notification_service = TestNotificationService::new();
3982 let statement_store = TestStatementStore::new();
3983 let (queue_sender, _queue_receiver) = async_channel::bounded(100);
3984
3985 let mut handler = StatementHandler {
3986 protocol_name: "/statement/1".into(),
3987 notification_service: Box::new(notification_service),
3988 propagate_timeout: (Box::pin(futures::stream::pending())
3989 as Pin<Box<dyn Stream<Item = ()> + Send>>)
3990 .fuse(),
3991 pending_statements: FuturesUnordered::new(),
3992 pending_statements_peers: HashMap::new(),
3993 network: network.clone(),
3994 sync,
3995 sync_event_stream: (Box::pin(futures::stream::pending())
3996 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
3997 .fuse(),
3998 peers: HashMap::new(),
3999 statement_store: Arc::new(statement_store),
4000 queue_sender,
4001 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4002 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4003 metrics: None,
4004 initial_sync_timeout: Box::pin(futures::future::pending()),
4005 pending_affinities_timeout: Box::pin(futures::future::pending()),
4006 pending_initial_syncs: HashMap::new(),
4007 initial_sync_peer_queue: VecDeque::new(),
4008 deferred_peers: HashSet::new(),
4009 dropped_statements_during_sync: false,
4010 sync_recovery_peer: None,
4011 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4012 };
4013
4014 let peer1 = PeerId::random();
4015 let peer2 = PeerId::random();
4016 let peer3 = PeerId::random();
4017
4018 handler.handle_sync_event(SyncEvent::PeerConnected(peer1));
4019 handler.handle_sync_event(SyncEvent::PeerConnected(peer2));
4020 handler.handle_sync_event(SyncEvent::PeerConnected(peer3));
4021
4022 assert!(network.get_added_reserved().is_empty());
4024 assert!(network.get_removed_reserved().is_empty());
4025 assert_eq!(handler.deferred_peers.len(), 3);
4026
4027 handler.handle_sync_event(SyncEvent::PeerDisconnected(peer1));
4029 assert_eq!(handler.deferred_peers.len(), 2);
4030 assert!(!handler.deferred_peers.contains(&peer1), "disconnected peer must leave buffer");
4031 assert!(handler.deferred_peers.contains(&peer2));
4032 assert!(handler.deferred_peers.contains(&peer3));
4033 assert!(network.get_removed_reserved().is_empty(), "no remove call for buffered peer");
4034 }
4035
4036 #[test]
4037 fn deferred_peers_flushed_on_sync_end_without_remove() {
4038 let (sync, flag) = TestSync::with_syncing(true);
4039 let network = TestNetwork::new();
4040 let notification_service = TestNotificationService::new();
4041 let statement_store = TestStatementStore::new();
4042 let (queue_sender, _queue_receiver) = async_channel::bounded(100);
4043
4044 let peer1 = PeerId::random();
4045 let peer2 = PeerId::random();
4046 let mut deferred = HashSet::new();
4047 deferred.insert(peer1);
4048 deferred.insert(peer2);
4049
4050 let mut handler = StatementHandler {
4051 protocol_name: "/statement/1".into(),
4052 notification_service: Box::new(notification_service),
4053 propagate_timeout: (Box::pin(futures::stream::pending())
4054 as Pin<Box<dyn Stream<Item = ()> + Send>>)
4055 .fuse(),
4056 pending_statements: FuturesUnordered::new(),
4057 pending_statements_peers: HashMap::new(),
4058 network: network.clone(),
4059 sync,
4060 sync_event_stream: (Box::pin(futures::stream::pending())
4061 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4062 .fuse(),
4063 peers: HashMap::new(),
4064 statement_store: Arc::new(statement_store),
4065 queue_sender,
4066 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4067 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4068 metrics: None,
4069 initial_sync_timeout: Box::pin(futures::future::pending()),
4070 pending_affinities_timeout: Box::pin(futures::future::pending()),
4071 pending_initial_syncs: HashMap::new(),
4072 initial_sync_peer_queue: VecDeque::new(),
4073 deferred_peers: deferred,
4074 dropped_statements_during_sync: false,
4075 sync_recovery_peer: None,
4076 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4077 };
4078
4079 flag.store(false, std::sync::atomic::Ordering::Relaxed);
4080 handler.drain_deferred_peers();
4081
4082 assert!(handler.deferred_peers.is_empty());
4083
4084 let added = network.get_added_reserved();
4085 assert_eq!(added.len(), 1);
4086 let added_addrs = &added[0];
4087 let expected_addr1: sc_network::Multiaddr =
4088 iter::once(multiaddr::Protocol::P2p(peer1.into())).collect();
4089 let expected_addr2: sc_network::Multiaddr =
4090 iter::once(multiaddr::Protocol::P2p(peer2.into())).collect();
4091 assert!(added_addrs.contains(&expected_addr1), "peer1 must be in added set");
4092 assert!(added_addrs.contains(&expected_addr2), "peer2 must be in added set");
4093
4094 assert!(network.get_removed_reserved().is_empty());
4095 }
4096
4097 #[tokio::test]
4098 async fn sync_recovery_schedules_remove_for_one_connected_peer() {
4099 let network = TestNetwork::new();
4100 let notification_service = TestNotificationService::new();
4101 let (sync, _flag) = TestSync::with_syncing(false);
4102 let (queue_sender, _) = async_channel::bounded(2);
4103 let statement_store = TestStatementStore::new();
4104
4105 let connected_peer = PeerId::random();
4106
4107 let mut peers = HashMap::new();
4108 peers.insert(
4109 connected_peer,
4110 Peer {
4111 known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
4112 rate_limiter: PeerRateLimiter::new(
4113 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4114 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4115 NonZeroU32::new(
4116 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
4117 )
4118 .expect("burst capacity is nonzero"),
4119 ),
4120 protocol_version: PeerProtocolVersion::V1,
4121 topic_affinity: None,
4122 is_light: false,
4123 pending_topic_affinity: None,
4124 },
4125 );
4126
4127 let mut handler = StatementHandler {
4128 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
4129 notification_service: Box::new(notification_service),
4130 propagate_timeout: (Box::pin(futures::stream::pending())
4131 as Pin<Box<dyn Stream<Item = ()> + Send>>)
4132 .fuse(),
4133 pending_statements: FuturesUnordered::new(),
4134 pending_statements_peers: HashMap::new(),
4135 network: network.clone(),
4136 sync,
4137 sync_event_stream: (Box::pin(futures::stream::pending())
4138 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4139 .fuse(),
4140 peers,
4141 statement_store: Arc::new(statement_store),
4142 queue_sender,
4143 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4144 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4145 metrics: None,
4146 initial_sync_timeout: Box::pin(futures::future::pending()),
4147 pending_affinities_timeout: Box::pin(futures::future::pending()),
4148 pending_initial_syncs: HashMap::new(),
4149 initial_sync_peer_queue: VecDeque::new(),
4150 deferred_peers: HashSet::new(),
4151 dropped_statements_during_sync: true,
4152 sync_recovery_peer: None,
4153 sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
4154 };
4155
4156 handler.start_sync_recovery();
4157
4158 {
4160 let removed = network.removed_reserved.lock().unwrap();
4161 assert_eq!(
4162 removed.len(),
4163 1,
4164 "Expected exactly one remove_peers_from_reserved_set call"
4165 );
4166 assert!(removed[0].contains(&connected_peer));
4167 }
4168
4169 assert_eq!(handler.sync_recovery_peer, Some(connected_peer));
4171
4172 handler.try_readd_sync_recovery_peer();
4175 assert!(handler.sync_recovery_peer.is_none());
4176 {
4177 let added = network.added_reserved.lock().unwrap();
4178 assert_eq!(added.len(), 1);
4179 let expected_addr: multiaddr::Multiaddr =
4180 iter::once(multiaddr::Protocol::P2p(connected_peer.into())).collect();
4181 assert!(added[0].contains(&expected_addr));
4182 }
4183
4184 {
4187 let peer2 = PeerId::random();
4188 handler.sync_recovery_peer = Some(peer2);
4189 handler.start_sync_recovery();
4190 assert_eq!(
4191 handler.sync_recovery_peer,
4192 Some(peer2),
4193 "Re-entry guard: recovery peer must not change on second call"
4194 );
4195 assert_eq!(
4196 network.removed_reserved.lock().unwrap().len(),
4197 1,
4198 "Re-entry guard: no extra remove call while recovery is in flight"
4199 );
4200 }
4201 }
4202
4203 #[tokio::test]
4204 async fn sync_recovery_gated_by_dropped_statements_flag() {
4205 let make_peer = || Peer {
4206 known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
4207 rate_limiter: PeerRateLimiter::new(
4208 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4209 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4210 NonZeroU32::new(
4211 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
4212 )
4213 .expect("burst capacity is nonzero"),
4214 ),
4215 protocol_version: PeerProtocolVersion::V1,
4216 topic_affinity: None,
4217 is_light: false,
4218 pending_topic_affinity: None,
4219 };
4220
4221 let make_handler =
4222 |network: TestNetwork, dropped: bool| -> StatementHandler<TestNetwork, TestSync> {
4223 let (sync, _) = TestSync::with_syncing(false);
4224 let (queue_sender, _) = async_channel::bounded(2);
4225 let mut peers = HashMap::new();
4226 peers.insert(PeerId::random(), make_peer());
4227 StatementHandler {
4228 protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
4229 notification_service: Box::new(TestNotificationService::new()),
4230 propagate_timeout: (Box::pin(futures::stream::pending())
4231 as Pin<Box<dyn Stream<Item = ()> + Send>>)
4232 .fuse(),
4233 pending_statements: FuturesUnordered::new(),
4234 pending_statements_peers: HashMap::new(),
4235 network,
4236 sync,
4237 sync_event_stream: (Box::pin(futures::stream::pending())
4238 as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
4239 .fuse(),
4240 peers,
4241 statement_store: Arc::new(TestStatementStore::new()),
4242 queue_sender,
4243 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
4244 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
4245 metrics: None,
4246 initial_sync_timeout: Box::pin(futures::future::pending()),
4247 pending_affinities_timeout: Box::pin(futures::future::pending()),
4248 pending_initial_syncs: HashMap::new(),
4249 initial_sync_peer_queue: VecDeque::new(),
4250 deferred_peers: HashSet::new(),
4251 dropped_statements_during_sync: dropped,
4252 sync_recovery_peer: None,
4253 sync_recovery_readd_timeout: Box::pin(pending().fuse()),
4254 }
4255 };
4256
4257 let net = TestNetwork::new();
4259 let mut handler = make_handler(net.clone(), false);
4260 handler.start_sync_recovery();
4261 assert!(handler.sync_recovery_peer.is_none());
4262 assert!(net.get_removed_reserved().is_empty());
4263
4264 let net2 = TestNetwork::new();
4266 let mut handler2 = make_handler(net2.clone(), true);
4267 handler2.start_sync_recovery();
4268 assert!(handler2.sync_recovery_peer.is_some());
4269 assert_eq!(net2.get_removed_reserved().len(), 1);
4270 }
4271}