1use self::config::*;
18
19use codec::{Compact, Decode, Encode, MaxEncodedLen};
20#[cfg(any(test, feature = "test-helpers"))]
21use futures::future::pending;
22use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
23use governor::{
24 clock::DefaultClock,
25 state::{InMemoryState, NotKeyed},
26 Quota, RateLimiter,
27};
28use soil_prometheus::{
29 exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
30 Registry, U64,
31};
32use soil_network::sync::{SyncEvent, SyncEventStream};
33use soil_network::types::PeerId;
34use soil_network::{
35 config::{NonReservedPeerMode, SetConfig},
36 error, multiaddr,
37 peer_store::PeerStoreProvider,
38 service::{
39 traits::{NotificationEvent, NotificationService, ValidationResult},
40 NotificationMetrics,
41 },
42 types::ProtocolName,
43 utils::{interval, LruHashSet},
44 NetworkBackend, NetworkEventStream, NetworkPeers,
45};
46use soil_statement_store::{
47 FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
48};
49use std::{
50 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
51 iter,
52 num::{NonZeroU32, NonZeroUsize},
53 pin::Pin,
54 sync::Arc,
55 time::Instant,
56};
57use subsoil::runtime::traits::Block as BlockT;
58use tokio::time::timeout;
59pub mod config;
60
61pub type Statements = Vec<Statement>;
63pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
65
66mod rep {
67 use soil_network::ReputationChange as Rep;
68 pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
73 pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
75 pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
77 pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
79 pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
81 pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
83}
84
85const LOG_TARGET: &str = "statement-gossip";
86const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
88const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
90
91struct Metrics {
92 propagated_statements: Counter<U64>,
93 known_statements_received: Counter<U64>,
94 skipped_oversized_statements: Counter<U64>,
95 propagated_statements_chunks: Histogram,
96 pending_statements: Gauge<U64>,
97 ignored_statements: Counter<U64>,
98 peers_connected: Gauge<U64>,
99 statements_received: Counter<U64>,
100 bytes_sent_total: Counter<U64>,
101 bytes_received_total: Counter<U64>,
102 sent_latency_seconds: Histogram,
103 initial_sync_statements_sent: Counter<U64>,
104 initial_sync_bursts_total: Counter<U64>,
105 initial_sync_peers_active: Gauge<U64>,
106 initial_sync_duration_seconds: Histogram,
107 statement_flooding_detected: Counter<U64>,
108}
109
110impl Metrics {
111 fn register(r: &Registry) -> Result<Self, PrometheusError> {
112 Ok(Self {
113 propagated_statements: register(
114 Counter::new(
115 "substrate_sync_propagated_statements",
116 "Number of statements propagated to at least one peer",
117 )?,
118 r,
119 )?,
120 known_statements_received: register(
121 Counter::new(
122 "substrate_sync_known_statement_received",
123 "Number of statements received via gossiping that were already in the statement store",
124 )?,
125 r,
126 )?,
127 skipped_oversized_statements: register(
128 Counter::new(
129 "substrate_sync_skipped_oversized_statements",
130 "Number of oversized statements that were skipped to be gossiped",
131 )?,
132 r,
133 )?,
134 propagated_statements_chunks: register(
135 Histogram::with_opts(
136 HistogramOpts::new(
137 "substrate_sync_propagated_statements_chunks",
138 "Distribution of chunk sizes when propagating statements",
139 )
140 .buckets(exponential_buckets(1.0, 2.0, 14)?),
141 )?,
142 r,
143 )?,
144 pending_statements: register(
145 Gauge::new(
146 "substrate_sync_pending_statement_validations",
147 "Number of pending statement validations",
148 )?,
149 r,
150 )?,
151 ignored_statements: register(
152 Counter::new(
153 "substrate_sync_ignored_statements",
154 "Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
155 )?,
156 r,
157 )?,
158 peers_connected: register(
159 Gauge::new(
160 "substrate_sync_statement_peers_connected",
161 "Number of peers connected using the statement protocol",
162 )?,
163 r,
164 )?,
165 statements_received: register(
166 Counter::new(
167 "substrate_sync_statements_received",
168 "Total number of statements received from peers",
169 )?,
170 r,
171 )?,
172 bytes_sent_total: register(
173 Counter::new(
174 "substrate_sync_statement_bytes_sent_total",
175 "Total bytes sent for statement protocol messages",
176 )?,
177 r,
178 )?,
179 bytes_received_total: register(
180 Counter::new(
181 "substrate_sync_statement_bytes_received_total",
182 "Total bytes received for statement protocol messages",
183 )?,
184 r,
185 )?,
186 sent_latency_seconds: register(
187 Histogram::with_opts(
188 HistogramOpts::new(
189 "substrate_sync_statement_sent_latency_seconds",
190 "Time to send statement messages to peers",
191 )
192 .buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
194 )?,
195 r,
196 )?,
197 initial_sync_statements_sent: register(
198 Counter::new(
199 "substrate_sync_initial_sync_statements_sent",
200 "Total statements sent during initial sync bursts to newly connected peers",
201 )?,
202 r,
203 )?,
204 initial_sync_bursts_total: register(
205 Counter::new(
206 "substrate_sync_initial_sync_bursts_total",
207 "Total number of initial sync burst rounds processed",
208 )?,
209 r,
210 )?,
211 initial_sync_peers_active: register(
212 Gauge::new(
213 "substrate_sync_initial_sync_peers_active",
214 "Number of peers currently being synced via initial sync",
215 )?,
216 r,
217 )?,
218 initial_sync_duration_seconds: register(
219 Histogram::with_opts(
220 HistogramOpts::new(
221 "substrate_sync_initial_sync_duration_seconds",
222 "Per-peer total duration of initial sync from start to completion",
223 )
224 .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
225 )?,
226 r,
227 )?,
228 statement_flooding_detected: register(
229 Counter::new(
230 "substrate_sync_statement_flooding_detected",
231 "Number of peers disconnected for exceeding statement rate limits",
232 )?,
233 r,
234 )?,
235 })
236 }
237}
238
239pub struct StatementHandlerPrototype {
241 protocol_name: ProtocolName,
242 notification_service: Box<dyn NotificationService>,
243}
244
245impl StatementHandlerPrototype {
246 pub fn new<
248 Hash: AsRef<[u8]>,
249 Block: BlockT,
250 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
251 >(
252 genesis_hash: Hash,
253 fork_id: Option<&str>,
254 metrics: NotificationMetrics,
255 peer_store_handle: Arc<dyn PeerStoreProvider>,
256 ) -> (Self, Net::NotificationProtocolConfig) {
257 let genesis_hash = genesis_hash.as_ref();
258 let protocol_name = if let Some(fork_id) = fork_id {
259 format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
260 } else {
261 format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
262 };
263 let (config, notification_service) = Net::notification_config(
264 protocol_name.clone().into(),
265 Vec::new(),
266 MAX_STATEMENT_NOTIFICATION_SIZE,
267 None,
268 SetConfig {
269 in_peers: 0,
270 out_peers: 0,
271 reserved_nodes: Vec::new(),
272 non_reserved_mode: NonReservedPeerMode::Deny,
273 },
274 metrics,
275 peer_store_handle,
276 );
277
278 (Self { protocol_name: protocol_name.into(), notification_service }, config)
279 }
280
281 pub fn build<
286 N: NetworkPeers + NetworkEventStream,
287 S: SyncEventStream + soil_client::consensus::SyncOracle,
288 >(
289 self,
290 network: N,
291 sync: S,
292 statement_store: Arc<dyn StatementStore>,
293 metrics_registry: Option<&Registry>,
294 executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
295 mut num_submission_workers: usize,
296 statements_per_second: u32,
297 ) -> error::Result<StatementHandler<N, S>> {
298 let sync_event_stream = sync.event_stream("statement-handler-sync");
299 let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
300
301 if num_submission_workers == 0 {
302 log::warn!(
303 target: LOG_TARGET,
304 "num_submission_workers is 0, defaulting to 1"
305 );
306 num_submission_workers = 1;
307 }
308
309 let statements_per_second = match NonZeroU32::new(statements_per_second) {
310 Some(rate) => rate,
311 None => {
312 log::warn!(
313 target: LOG_TARGET,
314 "statements_per_second is 0, defaulting to {}",
315 DEFAULT_STATEMENTS_PER_SECOND
316 );
317 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
318 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
319 },
320 };
321
322 let metrics =
323 if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
324
325 for _ in 0..num_submission_workers {
326 let store = statement_store.clone();
327 let mut queue_receiver = queue_receiver.clone();
328 executor(
329 async move {
330 loop {
331 let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
332 queue_receiver.next().await;
333 match task {
334 None => return,
335 Some((statement, completion)) => {
336 let result = store.submit(statement, StatementSource::Network);
337 if completion.send(result).is_err() {
338 log::debug!(
339 target: LOG_TARGET,
340 "Error sending validation completion"
341 );
342 }
343 },
344 }
345 }
346 }
347 .boxed(),
348 );
349 }
350
351 let handler = StatementHandler {
352 protocol_name: self.protocol_name,
353 notification_service: self.notification_service,
354 propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
355 as Pin<Box<dyn Stream<Item = ()> + Send>>)
356 .fuse(),
357 pending_statements: FuturesUnordered::new(),
358 pending_statements_peers: HashMap::new(),
359 network,
360 sync,
361 sync_event_stream: sync_event_stream.fuse(),
362 peers: HashMap::new(),
363 statement_store,
364 queue_sender,
365 statements_per_second,
366 metrics,
367 initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
368 pending_initial_syncs: HashMap::new(),
369 initial_sync_peer_queue: VecDeque::new(),
370 };
371
372 Ok(handler)
373 }
374}
375
376pub struct StatementHandler<
378 N: NetworkPeers + NetworkEventStream,
379 S: SyncEventStream + soil_client::consensus::SyncOracle,
380> {
381 protocol_name: ProtocolName,
382 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
384 pending_statements:
386 FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
387 pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
392 network: N,
394 sync: S,
396 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
398 notification_service: Box<dyn NotificationService>,
400 peers: HashMap<PeerId, Peer>,
402 statement_store: Arc<dyn StatementStore>,
403 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
404 statements_per_second: NonZeroU32,
406 metrics: Option<Metrics>,
408 initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
410 pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
412 initial_sync_peer_queue: VecDeque<PeerId>,
414}
415
416#[derive(Debug)]
421struct PeerRateLimiter {
422 limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
423}
424
425impl PeerRateLimiter {
426 fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
427 let quota = Quota::per_second(statements_per_second).allow_burst(burst);
428 Self { limiter: RateLimiter::direct(quota) }
429 }
430
431 fn is_flooding(&self, count: usize) -> bool {
433 if count > u32::MAX as usize {
434 return true;
435 }
436
437 let Some(n) = NonZeroU32::new(count as u32) else {
438 return false;
439 };
440 !matches!(self.limiter.check_n(n), Ok(Ok(())))
441 }
442}
443
444#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
446#[derive(Debug)]
447pub struct Peer {
448 known_statements: LruHashSet<Hash>,
450 rate_limiter: PeerRateLimiter,
452}
453
454struct PendingInitialSync {
456 hashes: Vec<Hash>,
457 started_at: Instant,
458}
459
460enum ChunkResult {
462 Send(usize),
464 SkipOversized,
466}
467
468enum SendChunkResult {
470 Sent(usize),
472 Skipped,
474 Empty,
476 Failed,
478}
479
480fn max_statement_payload_size() -> usize {
485 MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
486}
487
488fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
495 if statements.is_empty() {
496 return ChunkResult::Send(0);
497 }
498 let max_size = max_statement_payload_size();
499
500 let mut accumulated_size = 0;
505 let mut count = 0usize;
506
507 for stmt in &statements[0..] {
508 let stmt_size = stmt.encoded_size();
509 let new_count = count + 1;
510 let new_total = accumulated_size + stmt_size;
512 if new_total > max_size {
513 break;
514 }
515
516 accumulated_size += stmt_size;
517 count = new_count;
518 }
519
520 if count == 0 {
522 ChunkResult::SkipOversized
523 } else {
524 ChunkResult::Send(count)
525 }
526}
527
528impl Peer {
529 #[cfg(any(test, feature = "test-helpers"))]
531 pub fn new_for_testing(
532 known_statements: LruHashSet<Hash>,
533 statements_per_second: NonZeroU32,
534 burst: NonZeroU32,
535 ) -> Self {
536 Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
537 }
538}
539
540impl<N, S> StatementHandler<N, S>
541where
542 N: NetworkPeers + NetworkEventStream,
543 S: SyncEventStream + soil_client::consensus::SyncOracle,
544{
545 #[cfg(any(test, feature = "test-helpers"))]
547 pub fn new_for_testing(
548 protocol_name: ProtocolName,
549 notification_service: Box<dyn NotificationService>,
550 propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
551 network: N,
552 sync: S,
553 sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
554 peers: HashMap<PeerId, Peer>,
555 statement_store: Arc<dyn StatementStore>,
556 queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
557 statements_per_second: NonZeroU32,
558 ) -> Self {
559 Self {
560 protocol_name,
561 notification_service,
562 propagate_timeout,
563 pending_statements: FuturesUnordered::new(),
564 pending_statements_peers: HashMap::new(),
565 network,
566 sync,
567 sync_event_stream,
568 peers,
569 statement_store,
570 queue_sender,
571 statements_per_second,
572 metrics: None,
573 initial_sync_timeout: Box::pin(pending().fuse()),
574 pending_initial_syncs: HashMap::new(),
575 initial_sync_peer_queue: VecDeque::new(),
576 }
577 }
578
579 #[cfg(any(test, feature = "test-helpers"))]
581 pub fn pending_statements_mut(
582 &mut self,
583 ) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
584 {
585 &mut self.pending_statements
586 }
587
588 pub async fn run(mut self) {
591 loop {
592 futures::select_biased! {
593 _ = self.propagate_timeout.next() => {
594 self.propagate_statements().await;
595 self.metrics.as_ref().map(|metrics| {
596 metrics.pending_statements.set(self.pending_statements.len() as u64);
597 });
598 },
599 (hash, result) = self.pending_statements.select_next_some() => {
600 if let Some(peers) = self.pending_statements_peers.remove(&hash) {
601 if let Some(result) = result {
602 peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
603 }
604 } else {
605 log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
606 }
607 },
608 sync_event = self.sync_event_stream.next() => {
609 if let Some(sync_event) = sync_event {
610 self.handle_sync_event(sync_event);
611 } else {
612 return;
614 }
615 }
616 event = self.notification_service.next_event().fuse() => {
617 if let Some(event) = event {
618 self.handle_notification_event(event).await
619 } else {
620 return
622 }
623 }
624 _ = &mut self.initial_sync_timeout => {
625 self.process_initial_sync_burst().await;
626 self.initial_sync_timeout =
627 Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
628 },
629 }
630 }
631 }
632
633 async fn send_statement_chunk(
635 &mut self,
636 peer: &PeerId,
637 statements: &[&Statement],
638 ) -> SendChunkResult {
639 match find_sendable_chunk(statements) {
640 ChunkResult::Send(0) => SendChunkResult::Empty,
641 ChunkResult::Send(chunk_end) => {
642 let chunk = &statements[..chunk_end];
643 let encoded = chunk.encode();
644 let bytes_to_send = encoded.len() as u64;
645
646 let sent_latency_timer =
647 self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
648 let send_result = timeout(
649 SEND_TIMEOUT,
650 self.notification_service.send_async_notification(peer, encoded),
651 )
652 .await;
653 drop(sent_latency_timer);
654
655 if let Err(e) = send_result {
656 log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
657 return SendChunkResult::Failed;
658 }
659
660 log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
661 self.metrics.as_ref().map(|metrics| {
662 metrics.propagated_statements.inc_by(chunk.len() as u64);
663 metrics.bytes_sent_total.inc_by(bytes_to_send);
664 metrics.propagated_statements_chunks.observe(chunk.len() as f64);
665 });
666 SendChunkResult::Sent(chunk_end)
667 },
668 ChunkResult::SkipOversized => {
669 log::warn!(target: LOG_TARGET, "Statement too large, skipping");
670 self.metrics.as_ref().map(|metrics| {
671 metrics.skipped_oversized_statements.inc();
672 });
673 SendChunkResult::Skipped
674 },
675 }
676 }
677
678 fn handle_sync_event(&mut self, event: SyncEvent) {
679 match event {
680 SyncEvent::PeerConnected(remote) => {
681 let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
682 .collect::<multiaddr::Multiaddr>();
683 let result = self.network.add_peers_to_reserved_set(
684 self.protocol_name.clone(),
685 iter::once(addr).collect(),
686 );
687 if let Err(err) = result {
688 log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
689 }
690 },
691 SyncEvent::PeerDisconnected(remote) => {
692 let result = self.network.remove_peers_from_reserved_set(
693 self.protocol_name.clone(),
694 iter::once(remote).collect(),
695 );
696 if let Err(err) = result {
697 log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
698 }
699 },
700 }
701 }
702
703 async fn handle_notification_event(&mut self, event: NotificationEvent) {
704 match event {
705 NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
706 let result = self
708 .network
709 .peer_role(peer, handshake)
710 .map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
711 let _ = result_tx.send(result);
712 },
713 NotificationEvent::NotificationStreamOpened { peer, .. } => {
714 let _was_in = self.peers.insert(
715 peer,
716 Peer {
717 known_statements: LruHashSet::new(
718 NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
719 ),
720 rate_limiter: PeerRateLimiter::new(
721 self.statements_per_second,
722 NonZeroU32::new(
723 self.statements_per_second.get()
724 * config::STATEMENTS_BURST_COEFFICIENT,
725 )
726 .expect("burst capacity is nonzero"),
727 ),
728 },
729 );
730 debug_assert!(_was_in.is_none());
731
732 self.metrics.as_ref().map(|metrics| {
733 metrics.peers_connected.set(self.peers.len() as u64);
734 });
735
736 if !self.sync.is_major_syncing() {
737 let hashes = self.statement_store.statement_hashes();
738 if !hashes.is_empty() {
739 self.pending_initial_syncs.insert(
740 peer,
741 PendingInitialSync { hashes, started_at: Instant::now() },
742 );
743 self.initial_sync_peer_queue.push_back(peer);
744 self.metrics.as_ref().map(|metrics| {
745 metrics.initial_sync_peers_active.inc();
746 });
747 }
748 }
749 },
750 NotificationEvent::NotificationStreamClosed { peer } => {
751 let _peer = self.peers.remove(&peer);
752 debug_assert!(_peer.is_some());
753 if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
754 self.metrics.as_ref().map(|metrics| {
755 metrics.initial_sync_peers_active.dec();
756 metrics
757 .initial_sync_duration_seconds
758 .observe(pending.started_at.elapsed().as_secs_f64());
759 });
760 }
761 self.initial_sync_peer_queue.retain(|p| *p != peer);
762 self.metrics.as_ref().map(|metrics| {
763 metrics.peers_connected.set(self.peers.len() as u64);
764 });
765 },
766 NotificationEvent::NotificationReceived { peer, notification } => {
767 let bytes_received = notification.len() as u64;
768 self.metrics.as_ref().map(|metrics| {
769 metrics.bytes_received_total.inc_by(bytes_received);
770 });
771
772 if self.sync.is_major_syncing() {
774 log::trace!(
775 target: LOG_TARGET,
776 "{peer}: Ignoring statements while major syncing or offline"
777 );
778 return;
779 }
780
781 if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
782 self.on_statements(peer, statements);
783 } else {
784 log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
785 }
786 },
787 }
788 }
789
790 #[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
792 pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
793 log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
794
795 self.metrics.as_ref().map(|metrics| {
796 metrics.statements_received.inc_by(statements.len() as u64);
797 });
798
799 if let Some(ref mut peer) = self.peers.get_mut(&who) {
800 if peer.rate_limiter.is_flooding(statements.len()) {
801 log::warn!(
802 target: LOG_TARGET,
803 "Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
804 who,
805 self.statements_per_second
806 );
807
808 self.network.report_peer(who, rep::STATEMENT_FLOODING);
809 self.network.disconnect_peer(who, self.protocol_name.clone());
810 if let Some(ref metrics) = self.metrics {
811 metrics.statement_flooding_detected.inc();
812 }
813
814 self.peers.remove(&who);
816 self.pending_initial_syncs.remove(&who);
817 self.initial_sync_peer_queue.retain(|p| *p != who);
818
819 return;
820 }
821
822 let mut statements_left = statements.len() as u64;
823 for s in statements {
824 if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
825 log::debug!(
826 target: LOG_TARGET,
827 "Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
828 statements_left,
829 MAX_PENDING_STATEMENTS,
830 );
831 self.metrics.as_ref().map(|metrics| {
832 metrics.ignored_statements.inc_by(statements_left);
833 });
834 break;
835 }
836
837 let hash = s.hash();
838 peer.known_statements.insert(hash);
839
840 if self.statement_store.has_statement(&hash) {
841 self.metrics.as_ref().map(|metrics| {
842 metrics.known_statements_received.inc();
843 });
844
845 if let Some(peers) = self.pending_statements_peers.get(&hash) {
846 if peers.contains(&who) {
847 log::trace!(
848 target: LOG_TARGET,
849 "Already received the statement from the same peer {who}.",
850 );
851 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
852 }
853 }
854 continue;
855 }
856
857 self.network.report_peer(who, rep::ANY_STATEMENT);
858
859 match self.pending_statements_peers.entry(hash) {
860 Entry::Vacant(entry) => {
861 let (completion_sender, completion_receiver) = oneshot::channel();
862 match self.queue_sender.try_send((s, completion_sender)) {
863 Ok(()) => {
864 self.pending_statements.push(
865 async move {
866 let res = completion_receiver.await;
867 (hash, res.ok())
868 }
869 .boxed(),
870 );
871 entry.insert(HashSet::from_iter([who]));
872 },
873 Err(async_channel::TrySendError::Full(_)) => {
874 log::debug!(
875 target: LOG_TARGET,
876 "Dropped statement because validation channel is full",
877 );
878 },
879 Err(async_channel::TrySendError::Closed(_)) => {
880 log::trace!(
881 target: LOG_TARGET,
882 "Dropped statement because validation channel is closed",
883 );
884 },
885 }
886 },
887 Entry::Occupied(mut entry) => {
888 if !entry.get_mut().insert(who) {
889 self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
891 }
892 },
893 }
894
895 statements_left -= 1;
896 }
897 }
898 }
899
900 fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
901 match import {
902 SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
903 SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
904 SubmitResult::KnownExpired => {},
905 SubmitResult::Rejected(_) => {},
906 SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
907 SubmitResult::InternalError(_) => {},
908 }
909 }
910
911 pub async fn propagate_statement(&mut self, hash: &Hash) {
913 if self.sync.is_major_syncing() {
915 return;
916 }
917
918 log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
919 if let Ok(Some(statement)) = self.statement_store.statement(hash) {
920 self.do_propagate_statements(&[(*hash, statement)]).await;
921 }
922 }
923
924 async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
928 let Some(peer) = self.peers.get_mut(who) else {
929 return;
930 };
931
932 let to_send: Vec<_> = statements
933 .iter()
934 .filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
935 .collect();
936
937 log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
938
939 if to_send.is_empty() {
940 return;
941 }
942
943 self.send_statements_in_chunks(who, &to_send).await;
944 }
945
946 async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
948 let mut offset = 0;
949 while offset < statements.len() {
950 match self.send_statement_chunk(who, &statements[offset..]).await {
951 SendChunkResult::Sent(chunk_end) => {
952 offset += chunk_end;
953 },
954 SendChunkResult::Skipped => {
955 offset += 1;
956 },
957 SendChunkResult::Empty | SendChunkResult::Failed => return,
958 }
959 }
960 }
961
962 async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
963 log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
964 let peers: Vec<_> = self.peers.keys().copied().collect();
965 for who in peers {
966 log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
967 self.send_statements_to_peer(&who, statements).await;
968 }
969 log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
970 }
971
972 async fn propagate_statements(&mut self) {
974 if self.sync.is_major_syncing() {
976 return;
977 }
978
979 let Ok(statements) = self.statement_store.take_recent_statements() else { return };
980 if !statements.is_empty() {
981 self.do_propagate_statements(&statements).await;
982 }
983 }
984
985 fn record_initial_sync_completion(&self, started_at: Instant) {
987 self.metrics.as_ref().map(|metrics| {
988 metrics.initial_sync_peers_active.dec();
989 metrics
990 .initial_sync_duration_seconds
991 .observe(started_at.elapsed().as_secs_f64());
992 });
993 }
994
995 async fn process_initial_sync_burst(&mut self) {
997 if self.sync.is_major_syncing() {
998 return;
999 }
1000
1001 let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
1002 return;
1003 };
1004
1005 let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
1006 return;
1007 };
1008
1009 self.metrics.as_ref().map(|metrics| {
1010 metrics.initial_sync_bursts_total.inc();
1011 });
1012
1013 if entry.get().hashes.is_empty() {
1014 let started_at = entry.get().started_at;
1015 entry.remove();
1016 self.record_initial_sync_completion(started_at);
1017 return;
1018 }
1019
1020 let max_size = max_statement_payload_size();
1022 let mut accumulated_size = 0;
1023 let (statements, processed) = match self.statement_store.statements_by_hashes(
1024 &entry.get().hashes,
1025 &mut |_hash, encoded, _stmt| {
1026 if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
1027 return FilterDecision::Abort;
1028 }
1029 accumulated_size += encoded.len();
1030 FilterDecision::Take
1031 },
1032 ) {
1033 Ok(r) => r,
1034 Err(e) => {
1035 log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
1036 let started_at = entry.get().started_at;
1037 entry.remove();
1038 self.record_initial_sync_completion(started_at);
1039 return;
1040 },
1041 };
1042
1043 entry.get_mut().hashes.drain(..processed);
1045 let has_more = !entry.get().hashes.is_empty();
1046 drop(entry);
1047
1048 let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
1050 match self.send_statement_chunk(&peer_id, &to_send).await {
1051 SendChunkResult::Failed => {
1052 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1053 self.record_initial_sync_completion(pending.started_at);
1054 }
1055 return;
1056 },
1057 SendChunkResult::Sent(sent) => {
1058 debug_assert_eq!(to_send.len(), sent);
1059 self.metrics.as_ref().map(|metrics| {
1060 metrics.initial_sync_statements_sent.inc_by(sent as u64);
1061 });
1062 if let Some(peer) = self.peers.get_mut(&peer_id) {
1064 for (hash, _) in &statements {
1065 peer.known_statements.insert(*hash);
1066 }
1067 }
1068 },
1069 SendChunkResult::Empty | SendChunkResult::Skipped => {},
1070 }
1071
1072 if has_more {
1074 self.initial_sync_peer_queue.push_back(peer_id);
1075 } else {
1076 if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
1077 self.record_initial_sync_completion(pending.started_at);
1078 }
1079 }
1080 }
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085
1086 use super::*;
1087 use std::sync::Mutex;
1088
1089 #[derive(Clone)]
1090 struct TestNetwork {
1091 reported_peers: Arc<Mutex<Vec<(PeerId, soil_network::ReputationChange)>>>,
1092 disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
1093 }
1094
1095 impl TestNetwork {
1096 fn new() -> Self {
1097 Self {
1098 reported_peers: Arc::new(Mutex::new(Vec::new())),
1099 disconnected_peers: Arc::new(Mutex::new(Vec::new())),
1100 }
1101 }
1102
1103 fn get_reports(&self) -> Vec<(PeerId, soil_network::ReputationChange)> {
1104 self.reported_peers.lock().unwrap().clone()
1105 }
1106
1107 fn get_disconnected_peers(&self) -> Vec<PeerId> {
1108 self.disconnected_peers.lock().unwrap().clone()
1109 }
1110 }
1111
1112 #[async_trait::async_trait]
1113 impl NetworkPeers for TestNetwork {
1114 fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
1115 unimplemented!()
1116 }
1117
1118 fn set_authorized_only(&self, _: bool) {
1119 unimplemented!()
1120 }
1121
1122 fn add_known_address(&self, _: PeerId, _: soil_network::Multiaddr) {
1123 unimplemented!()
1124 }
1125
1126 fn report_peer(&self, peer_id: PeerId, cost_benefit: soil_network::ReputationChange) {
1127 self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
1128 }
1129
1130 fn peer_reputation(&self, _: &PeerId) -> i32 {
1131 unimplemented!()
1132 }
1133
1134 fn disconnect_peer(&self, peer: PeerId, _: soil_network::ProtocolName) {
1135 self.disconnected_peers.lock().unwrap().push(peer);
1136 }
1137
1138 fn accept_unreserved_peers(&self) {
1139 unimplemented!()
1140 }
1141
1142 fn deny_unreserved_peers(&self) {
1143 unimplemented!()
1144 }
1145
1146 fn add_reserved_peer(
1147 &self,
1148 _: soil_network::config::MultiaddrWithPeerId,
1149 ) -> Result<(), String> {
1150 unimplemented!()
1151 }
1152
1153 fn remove_reserved_peer(&self, _: PeerId) {
1154 unimplemented!()
1155 }
1156
1157 fn set_reserved_peers(
1158 &self,
1159 _: soil_network::ProtocolName,
1160 _: std::collections::HashSet<soil_network::Multiaddr>,
1161 ) -> Result<(), String> {
1162 unimplemented!()
1163 }
1164
1165 fn add_peers_to_reserved_set(
1166 &self,
1167 _: soil_network::ProtocolName,
1168 _: std::collections::HashSet<soil_network::Multiaddr>,
1169 ) -> Result<(), String> {
1170 unimplemented!()
1171 }
1172
1173 fn remove_peers_from_reserved_set(
1174 &self,
1175 _: soil_network::ProtocolName,
1176 _: Vec<PeerId>,
1177 ) -> Result<(), String> {
1178 unimplemented!()
1179 }
1180
1181 fn sync_num_connected(&self) -> usize {
1182 unimplemented!()
1183 }
1184
1185 fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<soil_network::ObservedRole> {
1186 unimplemented!()
1187 }
1188
1189 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
1190 unimplemented!();
1191 }
1192 }
1193
1194 struct TestSync {}
1195
1196 impl SyncEventStream for TestSync {
1197 fn event_stream(
1198 &self,
1199 _name: &'static str,
1200 ) -> Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>> {
1201 unimplemented!()
1202 }
1203 }
1204
1205 impl soil_client::consensus::SyncOracle for TestSync {
1206 fn is_major_syncing(&self) -> bool {
1207 false
1208 }
1209
1210 fn is_offline(&self) -> bool {
1211 unimplemented!()
1212 }
1213 }
1214
1215 impl NetworkEventStream for TestNetwork {
1216 fn event_stream(
1217 &self,
1218 _name: &'static str,
1219 ) -> Pin<Box<dyn Stream<Item = soil_network::Event> + Send>> {
1220 unimplemented!()
1221 }
1222 }
1223
1224 #[derive(Debug, Clone)]
1225 struct TestNotificationService {
1226 sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
1227 }
1228
1229 impl TestNotificationService {
1230 fn new() -> Self {
1231 Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
1232 }
1233
1234 fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
1235 self.sent_notifications.lock().unwrap().clone()
1236 }
1237 }
1238
1239 #[async_trait::async_trait]
1240 impl NotificationService for TestNotificationService {
1241 async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1242 unimplemented!()
1243 }
1244
1245 async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
1246 unimplemented!()
1247 }
1248
1249 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
1250 self.sent_notifications.lock().unwrap().push((*peer, notification));
1251 }
1252
1253 async fn send_async_notification(
1254 &mut self,
1255 peer: &PeerId,
1256 notification: Vec<u8>,
1257 ) -> Result<(), soil_network::error::Error> {
1258 self.sent_notifications.lock().unwrap().push((*peer, notification));
1259 Ok(())
1260 }
1261
1262 async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1263 unimplemented!()
1264 }
1265
1266 fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
1267 unimplemented!()
1268 }
1269
1270 async fn next_event(&mut self) -> Option<soil_network::service::traits::NotificationEvent> {
1271 None
1272 }
1273
1274 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
1275 unimplemented!()
1276 }
1277
1278 fn protocol(&self) -> &soil_network::types::ProtocolName {
1279 unimplemented!()
1280 }
1281
1282 fn message_sink(
1283 &self,
1284 _peer: &PeerId,
1285 ) -> Option<Box<dyn soil_network::service::traits::MessageSink>> {
1286 unimplemented!()
1287 }
1288 }
1289
1290 #[derive(Clone)]
1291 struct TestStatementStore {
1292 statements:
1293 Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
1294 recent_statements:
1295 Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
1296 }
1297
1298 impl TestStatementStore {
1299 fn new() -> Self {
1300 Self { statements: Default::default(), recent_statements: Default::default() }
1301 }
1302 }
1303
1304 impl StatementStore for TestStatementStore {
1305 fn statements(
1306 &self,
1307 ) -> soil_statement_store::Result<
1308 Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1309 > {
1310 Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
1311 }
1312
1313 fn take_recent_statements(
1314 &self,
1315 ) -> soil_statement_store::Result<
1316 Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1317 > {
1318 Ok(self.recent_statements.lock().unwrap().drain().collect())
1319 }
1320
1321 fn statement(
1322 &self,
1323 _hash: &soil_statement_store::Hash,
1324 ) -> soil_statement_store::Result<Option<soil_statement_store::Statement>> {
1325 unimplemented!()
1326 }
1327
1328 fn has_statement(&self, hash: &soil_statement_store::Hash) -> bool {
1329 self.statements.lock().unwrap().contains_key(hash)
1330 }
1331
1332 fn statement_hashes(&self) -> Vec<soil_statement_store::Hash> {
1333 self.statements.lock().unwrap().keys().cloned().collect()
1334 }
1335
1336 fn statements_by_hashes(
1337 &self,
1338 hashes: &[soil_statement_store::Hash],
1339 filter: &mut dyn FnMut(
1340 &soil_statement_store::Hash,
1341 &[u8],
1342 &soil_statement_store::Statement,
1343 ) -> FilterDecision,
1344 ) -> soil_statement_store::Result<(
1345 Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
1346 usize,
1347 )> {
1348 let statements = self.statements.lock().unwrap();
1349 let mut result = Vec::new();
1350 let mut processed = 0;
1351 for hash in hashes {
1352 let Some(stmt) = statements.get(hash) else {
1353 processed += 1;
1354 continue;
1355 };
1356 let encoded = stmt.encode();
1357 match filter(hash, &encoded, stmt) {
1358 FilterDecision::Skip => {
1359 processed += 1;
1360 },
1361 FilterDecision::Take => {
1362 processed += 1;
1363 result.push((*hash, stmt.clone()));
1364 },
1365 FilterDecision::Abort => break,
1366 }
1367 }
1368 Ok((result, processed))
1369 }
1370
1371 fn broadcasts(
1372 &self,
1373 _match_all_topics: &[soil_statement_store::Topic],
1374 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1375 unimplemented!()
1376 }
1377
1378 fn posted(
1379 &self,
1380 _match_all_topics: &[soil_statement_store::Topic],
1381 _dest: [u8; 32],
1382 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1383 unimplemented!()
1384 }
1385
1386 fn posted_clear(
1387 &self,
1388 _match_all_topics: &[soil_statement_store::Topic],
1389 _dest: [u8; 32],
1390 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1391 unimplemented!()
1392 }
1393
1394 fn broadcasts_stmt(
1395 &self,
1396 _match_all_topics: &[soil_statement_store::Topic],
1397 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1398 unimplemented!()
1399 }
1400
1401 fn posted_stmt(
1402 &self,
1403 _match_all_topics: &[soil_statement_store::Topic],
1404 _dest: [u8; 32],
1405 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1406 unimplemented!()
1407 }
1408
1409 fn posted_clear_stmt(
1410 &self,
1411 _match_all_topics: &[soil_statement_store::Topic],
1412 _dest: [u8; 32],
1413 ) -> soil_statement_store::Result<Vec<Vec<u8>>> {
1414 unimplemented!()
1415 }
1416
1417 fn submit(
1418 &self,
1419 _statement: soil_statement_store::Statement,
1420 _source: soil_statement_store::StatementSource,
1421 ) -> soil_statement_store::SubmitResult {
1422 unimplemented!()
1423 }
1424
1425 fn remove(&self, _hash: &soil_statement_store::Hash) -> soil_statement_store::Result<()> {
1426 unimplemented!()
1427 }
1428
1429 fn remove_by(&self, _who: [u8; 32]) -> soil_statement_store::Result<()> {
1430 unimplemented!()
1431 }
1432 }
1433
1434 fn build_handler() -> (
1435 StatementHandler<TestNetwork, TestSync>,
1436 TestStatementStore,
1437 TestNetwork,
1438 TestNotificationService,
1439 async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
1440 ) {
1441 let statement_store = TestStatementStore::new();
1442 let (queue_sender, queue_receiver) = async_channel::bounded(2);
1443 let network = TestNetwork::new();
1444 let notification_service = TestNotificationService::new();
1445 let peer_id = PeerId::random();
1446 let mut peers = HashMap::new();
1447 peers.insert(
1448 peer_id,
1449 Peer {
1450 known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
1451 rate_limiter: PeerRateLimiter::new(
1452 NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1453 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1454 NonZeroU32::new(
1455 DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
1456 )
1457 .expect("burst capacity is nonzero"),
1458 ),
1459 },
1460 );
1461
1462 let handler = StatementHandler {
1463 protocol_name: "/statement/1".into(),
1464 notification_service: Box::new(notification_service.clone()),
1465 propagate_timeout: (Box::pin(futures::stream::pending())
1466 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1467 .fuse(),
1468 pending_statements: FuturesUnordered::new(),
1469 pending_statements_peers: HashMap::new(),
1470 network: network.clone(),
1471 sync: TestSync {},
1472 sync_event_stream: (Box::pin(futures::stream::pending())
1473 as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
1474 .fuse(),
1475 peers,
1476 statement_store: Arc::new(statement_store.clone()),
1477 queue_sender,
1478 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1479 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1480 metrics: None,
1481 initial_sync_timeout: Box::pin(futures::future::pending()),
1482 pending_initial_syncs: HashMap::new(),
1483 initial_sync_peer_queue: VecDeque::new(),
1484 };
1485 (handler, statement_store, network, notification_service, queue_receiver)
1486 }
1487
1488 #[tokio::test]
1489 async fn test_skips_processing_statements_that_already_in_store() {
1490 let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
1491 build_handler();
1492
1493 let mut statement1 = Statement::new();
1494 statement1.set_plain_data(b"statement1".to_vec());
1495 let hash1 = statement1.hash();
1496
1497 statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
1498
1499 let mut statement2 = Statement::new();
1500 statement2.set_plain_data(b"statement2".to_vec());
1501 let hash2 = statement2.hash();
1502
1503 let peer_id = *handler.peers.keys().next().unwrap();
1504
1505 handler.on_statements(peer_id, vec![statement1, statement2]);
1506
1507 let to_submit = queue_receiver.try_recv();
1508 assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
1509
1510 let no_more = queue_receiver.try_recv();
1511 assert!(no_more.is_err(), "Expected only one statement to be queued");
1512 }
1513
1514 #[tokio::test]
1515 async fn test_reports_for_duplicate_statements() {
1516 let (mut handler, statement_store, network, _notification_service, queue_receiver) =
1517 build_handler();
1518
1519 let peer_id = *handler.peers.keys().next().unwrap();
1520
1521 let mut statement1 = Statement::new();
1522 statement1.set_plain_data(b"statement1".to_vec());
1523
1524 handler.on_statements(peer_id, vec![statement1.clone()]);
1525 {
1526 let (s, _) = queue_receiver.try_recv().unwrap();
1528 let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
1529 handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
1530 }
1531
1532 handler.on_statements(peer_id, vec![statement1]);
1533
1534 let reports = network.get_reports();
1535 assert_eq!(
1536 reports,
1537 vec![
1538 (peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
1542 "Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
1543 reports
1544 );
1545 }
1546
1547 #[tokio::test]
1548 async fn test_splits_large_batches_into_smaller_chunks() {
1549 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1550 build_handler();
1551
1552 let num_statements = 30;
1553 let statement_size = 100 * 1024; for i in 0..num_statements {
1555 let mut statement = Statement::new();
1556 let mut data = vec![0u8; statement_size];
1557 data[0] = i as u8;
1558 statement.set_plain_data(data);
1559 let hash = statement.hash();
1560 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1561 }
1562
1563 handler.propagate_statements().await;
1564
1565 let sent = notification_service.get_sent_notifications();
1566 let mut total_statements_sent = 0;
1567 assert!(
1568 sent.len() == 3,
1569 "Expected batch to be split into 3 chunks, but got {} chunks",
1570 sent.len()
1571 );
1572 for (_peer, notification) in sent.iter() {
1573 assert!(
1574 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1575 "Notification size {} exceeds limit {}",
1576 notification.len(),
1577 MAX_STATEMENT_NOTIFICATION_SIZE
1578 );
1579 if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
1580 total_statements_sent += stmts.len();
1581 }
1582 }
1583
1584 assert_eq!(
1585 total_statements_sent, num_statements,
1586 "Expected all {} statements to be sent, but only {} were sent",
1587 num_statements, total_statements_sent
1588 );
1589 }
1590
1591 #[tokio::test]
1592 async fn test_skips_only_oversized_statements() {
1593 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1594 build_handler();
1595
1596 let mut statement1 = Statement::new();
1597 statement1.set_plain_data(vec![1u8; 100]);
1598 let hash1 = statement1.hash();
1599 statement_store
1600 .recent_statements
1601 .lock()
1602 .unwrap()
1603 .insert(hash1, statement1.clone());
1604
1605 let mut oversized1 = Statement::new();
1606 oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
1607 let hash_oversized1 = oversized1.hash();
1608 statement_store
1609 .recent_statements
1610 .lock()
1611 .unwrap()
1612 .insert(hash_oversized1, oversized1);
1613
1614 let mut statement2 = Statement::new();
1615 statement2.set_plain_data(vec![3u8; 100]);
1616 let hash2 = statement2.hash();
1617 statement_store
1618 .recent_statements
1619 .lock()
1620 .unwrap()
1621 .insert(hash2, statement2.clone());
1622
1623 let mut oversized2 = Statement::new();
1624 oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
1625 let hash_oversized2 = oversized2.hash();
1626 statement_store
1627 .recent_statements
1628 .lock()
1629 .unwrap()
1630 .insert(hash_oversized2, oversized2);
1631
1632 let mut statement3 = Statement::new();
1633 statement3.set_plain_data(vec![5u8; 100]);
1634 let hash3 = statement3.hash();
1635 statement_store
1636 .recent_statements
1637 .lock()
1638 .unwrap()
1639 .insert(hash3, statement3.clone());
1640
1641 handler.propagate_statements().await;
1642
1643 let sent = notification_service.get_sent_notifications();
1644
1645 let mut sent_hashes = sent
1646 .iter()
1647 .flat_map(|(_peer, notification)| {
1648 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1649 })
1650 .map(|s| s.hash())
1651 .collect::<Vec<_>>();
1652 sent_hashes.sort();
1653 let mut expected_hashes = vec![hash1, hash2, hash3];
1654 expected_hashes.sort();
1655 assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
1656 }
1657
1658 fn build_handler_no_peers() -> (
1659 StatementHandler<TestNetwork, TestSync>,
1660 TestStatementStore,
1661 TestNetwork,
1662 TestNotificationService,
1663 ) {
1664 let statement_store = TestStatementStore::new();
1665 let (queue_sender, _queue_receiver) = async_channel::bounded(2);
1666 let network = TestNetwork::new();
1667 let notification_service = TestNotificationService::new();
1668
1669 let handler = StatementHandler {
1670 protocol_name: "/statement/1".into(),
1671 notification_service: Box::new(notification_service.clone()),
1672 propagate_timeout: (Box::pin(futures::stream::pending())
1673 as Pin<Box<dyn Stream<Item = ()> + Send>>)
1674 .fuse(),
1675 pending_statements: FuturesUnordered::new(),
1676 pending_statements_peers: HashMap::new(),
1677 network: network.clone(),
1678 sync: TestSync {},
1679 sync_event_stream: (Box::pin(futures::stream::pending())
1680 as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
1681 .fuse(),
1682 peers: HashMap::new(),
1683 statement_store: Arc::new(statement_store.clone()),
1684 queue_sender,
1685 statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
1686 .expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
1687 metrics: None,
1688 initial_sync_timeout: Box::pin(futures::future::pending()),
1689 pending_initial_syncs: HashMap::new(),
1690 initial_sync_peer_queue: VecDeque::new(),
1691 };
1692 (handler, statement_store, network, notification_service)
1693 }
1694
1695 #[tokio::test]
1696 async fn test_initial_sync_burst_single_peer() {
1697 let (mut handler, statement_store, _network, notification_service) =
1698 build_handler_no_peers();
1699
1700 let num_statements = 200;
1703 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1705 for i in 0..num_statements {
1706 let mut statement = Statement::new();
1707 let mut data = vec![0u8; statement_size];
1708 data[0] = (i % 256) as u8;
1710 data[1] = (i / 256) as u8;
1711 statement.set_plain_data(data);
1712 let hash = statement.hash();
1713 expected_hashes.push(hash);
1714 statement_store.statements.lock().unwrap().insert(hash, statement);
1715 }
1716
1717 let peer_id = PeerId::random();
1719
1720 handler
1721 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1722 peer: peer_id,
1723 direction: soil_network::service::traits::Direction::Inbound,
1724 handshake: vec![],
1725 negotiated_fallback: None,
1726 })
1727 .await;
1728
1729 assert!(handler.peers.contains_key(&peer_id));
1731 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
1732 assert_eq!(handler.initial_sync_peer_queue.len(), 1);
1733
1734 let mut burst_count = 0;
1736 while handler.pending_initial_syncs.contains_key(&peer_id) {
1737 handler.process_initial_sync_burst().await;
1738 burst_count += 1;
1739 assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
1741 }
1742
1743 assert!(
1746 burst_count >= 10,
1747 "Expected multiple bursts for 200 statements of 100KB each, got {}",
1748 burst_count
1749 );
1750
1751 let sent = notification_service.get_sent_notifications();
1753 let mut sent_hashes: Vec<_> = sent
1754 .iter()
1755 .flat_map(|(peer, notification)| {
1756 assert_eq!(*peer, peer_id);
1757 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1758 })
1759 .map(|s| s.hash())
1760 .collect();
1761 sent_hashes.sort();
1762 expected_hashes.sort();
1763
1764 assert_eq!(
1765 sent_hashes.len(),
1766 expected_hashes.len(),
1767 "Expected {} statements to be sent, got {}",
1768 expected_hashes.len(),
1769 sent_hashes.len()
1770 );
1771 assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
1772
1773 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
1775 assert!(handler.initial_sync_peer_queue.is_empty());
1776 }
1777
1778 #[tokio::test]
1779 async fn test_initial_sync_burst_multiple_peers_round_robin() {
1780 let (mut handler, statement_store, _network, notification_service) =
1781 build_handler_no_peers();
1782
1783 let num_statements = 200;
1785 let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
1787 for i in 0..num_statements {
1788 let mut statement = Statement::new();
1789 let mut data = vec![0u8; statement_size];
1790 data[0] = (i % 256) as u8;
1791 data[1] = (i / 256) as u8;
1792 statement.set_plain_data(data);
1793 let hash = statement.hash();
1794 expected_hashes.push(hash);
1795 statement_store.statements.lock().unwrap().insert(hash, statement);
1796 }
1797
1798 let peer1 = PeerId::random();
1800 let peer2 = PeerId::random();
1801 let peer3 = PeerId::random();
1802
1803 for peer in [peer1, peer2, peer3] {
1805 handler
1806 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
1807 peer,
1808 direction: soil_network::service::traits::Direction::Inbound,
1809 handshake: vec![],
1810 negotiated_fallback: None,
1811 })
1812 .await;
1813 }
1814
1815 assert_eq!(handler.peers.len(), 3);
1817 assert_eq!(handler.pending_initial_syncs.len(), 3);
1818 assert_eq!(handler.initial_sync_peer_queue.len(), 3);
1819
1820 let mut peer_burst_order = Vec::new();
1822 let mut burst_count = 0;
1823
1824 while !handler.pending_initial_syncs.is_empty() {
1825 if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
1827 peer_burst_order.push(next_peer);
1828 }
1829 handler.process_initial_sync_burst().await;
1830 burst_count += 1;
1831 assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
1833 }
1834
1835 assert!(
1838 burst_count >= 30,
1839 "Expected many bursts for 3 peers with 200 statements each, got {}",
1840 burst_count
1841 );
1842
1843 assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
1845 assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
1847 assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
1848 assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
1849 assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
1851 assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
1852 assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
1853
1854 let sent = notification_service.get_sent_notifications();
1856 let mut peer1_hashes: Vec<_> = sent
1857 .iter()
1858 .filter(|(peer, _)| *peer == peer1)
1859 .flat_map(|(_, notification)| {
1860 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1861 })
1862 .map(|s| s.hash())
1863 .collect();
1864 let mut peer2_hashes: Vec<_> = sent
1865 .iter()
1866 .filter(|(peer, _)| *peer == peer2)
1867 .flat_map(|(_, notification)| {
1868 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1869 })
1870 .map(|s| s.hash())
1871 .collect();
1872 let mut peer3_hashes: Vec<_> = sent
1873 .iter()
1874 .filter(|(peer, _)| *peer == peer3)
1875 .flat_map(|(_, notification)| {
1876 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
1877 })
1878 .map(|s| s.hash())
1879 .collect();
1880
1881 peer1_hashes.sort();
1882 peer2_hashes.sort();
1883 peer3_hashes.sort();
1884 expected_hashes.sort();
1885
1886 assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
1887 assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
1888 assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
1889
1890 assert!(handler.pending_initial_syncs.is_empty());
1892 assert!(handler.initial_sync_peer_queue.is_empty());
1893 }
1894
1895 #[tokio::test]
1896 async fn test_send_statements_in_chunks_exact_max_size() {
1897 let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
1898 build_handler();
1899
1900 let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
1914 let num_statements: usize = 100;
1915 let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
1917 let total_data_size = max_size - total_overhead;
1918 let per_statement_data_size = total_data_size / num_statements;
1919 let remainder = total_data_size % num_statements;
1920
1921 let mut expected_hashes = Vec::with_capacity(num_statements);
1922 let mut total_encoded_size = 0;
1923
1924 for i in 0..num_statements {
1925 let mut statement = Statement::new();
1926 let extra = if i < remainder { 1 } else { 0 };
1928 let mut data = vec![42u8; per_statement_data_size + extra];
1929 data[0] = i as u8;
1931 data[1] = (i >> 8) as u8;
1932 statement.set_plain_data(data);
1933
1934 total_encoded_size += statement.encoded_size();
1935
1936 let hash = statement.hash();
1937 expected_hashes.push(hash);
1938 statement_store.recent_statements.lock().unwrap().insert(hash, statement);
1939 }
1940
1941 assert!(
1943 total_encoded_size == max_size,
1944 "Total encoded size {} should be <= max_size {}",
1945 total_encoded_size,
1946 max_size
1947 );
1948
1949 handler.propagate_statements().await;
1950
1951 let sent = notification_service.get_sent_notifications();
1952
1953 assert_eq!(
1955 sent.len(),
1956 1,
1957 "Expected 1 notification for all {} statements, but got {}",
1958 num_statements,
1959 sent.len()
1960 );
1961
1962 let (_peer, notification) = &sent[0];
1963 assert!(
1964 notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
1965 "Notification size {} exceeds limit {}",
1966 notification.len(),
1967 MAX_STATEMENT_NOTIFICATION_SIZE
1968 );
1969
1970 let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
1971 assert_eq!(
1972 decoded.len(),
1973 num_statements,
1974 "Expected {} statements in the notification",
1975 num_statements
1976 );
1977
1978 let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
1980 expected_hashes.sort();
1981 received_hashes.sort();
1982 assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
1983 }
1984
1985 #[tokio::test]
1986 async fn test_initial_sync_burst_size_limit_consistency() {
1987 let (mut handler, statement_store, _network, notification_service) =
1998 build_handler_no_peers();
1999
2000 let payload_limit = max_statement_payload_size();
2001
2002 let first_stmt_data_size = payload_limit / 2 + 10;
2004 let mut stmt1 = Statement::new();
2005 stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
2006 let stmt1_encoded_size = stmt1.encoded_size();
2007
2008 let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
2011 let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
2014 stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
2015 let stmt2_encoded_size = stmt2.encoded_size();
2016
2017 let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
2018
2019 assert!(
2021 total_encoded > payload_limit,
2022 "Total {} should exceed payload_limit {} so filter rejects second statement",
2023 total_encoded,
2024 payload_limit
2025 );
2026
2027 let hash1 = stmt1.hash();
2028 let hash2 = stmt2.hash();
2029 statement_store.statements.lock().unwrap().insert(hash1, stmt1);
2030 statement_store.statements.lock().unwrap().insert(hash2, stmt2);
2031
2032 let peer_id = PeerId::random();
2034
2035 handler
2036 .handle_notification_event(NotificationEvent::NotificationStreamOpened {
2037 peer: peer_id,
2038 direction: soil_network::service::traits::Direction::Inbound,
2039 handshake: vec![],
2040 negotiated_fallback: None,
2041 })
2042 .await;
2043
2044 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2046 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
2047
2048 handler.process_initial_sync_burst().await;
2050
2051 let sent = notification_service.get_sent_notifications();
2054 assert_eq!(sent.len(), 1, "First burst should send one notification");
2055
2056 let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
2057 assert_eq!(decoded.len(), 1, "First notification should contain one statement");
2058
2059 let sent_hash = decoded[0].hash();
2061 assert!(
2062 sent_hash == hash1 || sent_hash == hash2,
2063 "Sent statement should be one of the two created"
2064 );
2065
2066 assert!(handler.pending_initial_syncs.contains_key(&peer_id));
2068 assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
2069
2070 handler.process_initial_sync_burst().await;
2072
2073 let sent = notification_service.get_sent_notifications();
2074 assert_eq!(sent.len(), 2, "Second burst should send another notification");
2075
2076 let mut sent_hashes: Vec<_> = sent
2078 .iter()
2079 .flat_map(|(_, notification)| {
2080 <Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
2081 })
2082 .map(|s| s.hash())
2083 .collect();
2084 sent_hashes.sort();
2085 let mut expected_hashes = vec![hash1, hash2];
2086 expected_hashes.sort();
2087 assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
2088
2089 assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
2091 }
2092
2093 #[tokio::test]
2094 async fn test_peer_disconnected_on_flooding() {
2095 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2096 build_handler();
2097
2098 let peer_id = *handler.peers.keys().next().unwrap();
2099
2100 let mut flood_statements = Vec::new();
2101 for i in 0..600_000 {
2102 let mut statement = Statement::new();
2103 statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
2104 flood_statements.push(statement);
2105 }
2106
2107 handler.on_statements(peer_id, flood_statements);
2108
2109 let reports = network.get_reports();
2110 assert!(
2111 reports
2112 .iter()
2113 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2114 "Expected STATEMENT_FLOODING reputation change, but got: {:?}",
2115 reports
2116 );
2117
2118 let disconnected = network.get_disconnected_peers();
2119 assert!(
2120 disconnected.contains(&peer_id),
2121 "Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
2122 peer_id,
2123 disconnected
2124 );
2125
2126 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2128 assert!(
2129 !handler.pending_initial_syncs.contains_key(&peer_id),
2130 "Peer should be removed from pending_initial_syncs"
2131 );
2132 assert!(
2133 !handler.initial_sync_peer_queue.contains(&peer_id),
2134 "Peer should be removed from initial_sync_peer_queue"
2135 );
2136 }
2137
2138 #[tokio::test]
2139 async fn test_legitimate_traffic_not_flagged() {
2140 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2141 build_handler();
2142
2143 let peer_id = *handler.peers.keys().next().unwrap();
2144
2145 let start = std::time::Instant::now();
2146 let duration = std::time::Duration::from_secs(5);
2147 let mut counter = 0u32;
2148
2149 while start.elapsed() < duration {
2150 let mut statements = Vec::new();
2151 for i in 0..5_000 {
2152 let mut statement = Statement::new();
2153 statement.set_plain_data(vec![
2154 counter as u8,
2155 (counter >> 8) as u8,
2156 (counter >> 16) as u8,
2157 i as u8,
2158 ]);
2159 statements.push(statement);
2160 counter = counter.wrapping_add(1);
2161 }
2162
2163 handler.on_statements(peer_id, statements);
2164
2165 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2166 }
2167
2168 let reports = network.get_reports();
2169 assert!(
2170 !reports
2171 .iter()
2172 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2173 "Legitimate traffic should not trigger flooding detection. Reports: {:?}",
2174 reports
2175 );
2176
2177 let disconnected = network.get_disconnected_peers();
2178 assert!(
2179 !disconnected.contains(&peer_id),
2180 "Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
2181 disconnected
2182 );
2183
2184 assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
2185 }
2186
2187 #[tokio::test]
2188 async fn test_just_over_rate_limit_triggers_flooding() {
2189 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2190 build_handler();
2191
2192 let peer_id = *handler.peers.keys().next().unwrap();
2193
2194 let mut statements = Vec::new();
2195 for i in 0..260_000 {
2196 let mut statement = Statement::new();
2197 statement.set_plain_data(vec![
2198 i as u8,
2199 (i >> 8) as u8,
2200 (i >> 16) as u8,
2201 (i >> 24) as u8,
2202 ]);
2203 statements.push(statement);
2204 }
2205
2206 handler.on_statements(peer_id, statements);
2207
2208 let reports = network.get_reports();
2209 let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
2210 assert!(
2211 reports
2212 .iter()
2213 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2214 "Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
2215 expected_burst,
2216 reports
2217 );
2218
2219 let disconnected = network.get_disconnected_peers();
2220 assert!(
2221 disconnected.contains(&peer_id),
2222 "Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
2223 disconnected
2224 );
2225
2226 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2227 }
2228
2229 #[tokio::test]
2230 async fn test_burst_of_250k_statements_allowed() {
2231 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2232 build_handler();
2233
2234 let peer_id = *handler.peers.keys().next().unwrap();
2235
2236 let mut statements = Vec::new();
2237 for i in 0..250_000 {
2238 let mut statement = Statement::new();
2239 statement.set_plain_data(vec![
2240 i as u8,
2241 (i >> 8) as u8,
2242 (i >> 16) as u8,
2243 (i >> 24) as u8,
2244 ]);
2245 statements.push(statement);
2246 }
2247
2248 handler.on_statements(peer_id, statements);
2249
2250 let reports = network.get_reports();
2251 assert!(
2252 !reports
2253 .iter()
2254 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
2255 "250k burst should be allowed (burst = rate × 5). Reports: {:?}",
2256 reports
2257 );
2258
2259 assert!(
2260 handler.peers.contains_key(&peer_id),
2261 "Peer should still be connected after 250k burst"
2262 );
2263 }
2264
2265 #[tokio::test]
2266 async fn test_sustained_rate_above_limit_triggers_flooding() {
2267 let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
2268 build_handler();
2269
2270 let peer_id = *handler.peers.keys().next().unwrap();
2271
2272 let mut counter = 0u32;
2273
2274 let start = std::time::Instant::now();
2275 let duration = std::time::Duration::from_secs(5);
2276
2277 let mut flooding_detected = false;
2278 while start.elapsed() < duration {
2279 let mut statements = Vec::new();
2280 for i in 0..30_000 {
2281 let mut statement = Statement::new();
2282 statement.set_plain_data(vec![
2283 counter as u8,
2284 (counter >> 8) as u8,
2285 (counter >> 16) as u8,
2286 i as u8,
2287 ]);
2288 statements.push(statement);
2289 counter = counter.wrapping_add(1);
2290 }
2291
2292 handler.on_statements(peer_id, statements);
2293
2294 let reports = network.get_reports();
2296 if reports
2297 .iter()
2298 .any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
2299 {
2300 flooding_detected = true;
2301 break;
2302 }
2303
2304 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2305 }
2306
2307 assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
2308
2309 let disconnected = network.get_disconnected_peers();
2310 assert!(
2311 disconnected.contains(&peer_id),
2312 "Peer should be disconnected after sustained high rate. Disconnected: {:?}",
2313 disconnected
2314 );
2315
2316 assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
2317 }
2318}