mod affinity;
use crate::config::*;
use affinity::AffinityFilter;
use codec::{Compact, Decode, Encode, MaxEncodedLen};
use futures::{
channel::oneshot,
future::{pending, FusedFuture},
prelude::*,
stream::FuturesUnordered,
};
use governor::{
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
Quota, RateLimiter,
};
use prometheus_endpoint::{
exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
Registry, U64,
};
use rand::seq::IteratorRandom;
use sc_network::{
config::{NonReservedPeerMode, SetConfig},
error, multiaddr,
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
NotificationMetrics,
},
types::ProtocolName,
utils::{interval, LruHashSet},
NetworkBackend, NetworkEventStream, NetworkPeers,
};
use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_network_types::PeerId;
use sp_runtime::traits::Block as BlockT;
use sp_statement_store::{
FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
iter,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
time::Instant,
};
use tokio::time::timeout;
pub mod config;
pub type Statements = Vec<Statement>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PeerProtocolVersion {
V1,
V2,
}
impl PeerProtocolVersion {
fn envelope_overhead(&self) -> usize {
match self {
PeerProtocolVersion::V1 => V1_ENVELOPE_OVERHEAD,
PeerProtocolVersion::V2 => V2_ENVELOPE_OVERHEAD,
}
}
}
#[derive(Debug, Encode, Decode)]
enum StatementMessage {
#[codec(index = 0)]
Statements(Vec<Statement>),
#[codec(index = 1)]
ExplicitTopicAffinity(AffinityFilter),
}
const STATEMENTS_VARIANT_INDEX: u8 = 0;
impl StatementMessage {
fn encode_statement_refs(statements: &[&Statement]) -> Vec<u8> {
let mut out = Vec::new();
STATEMENTS_VARIANT_INDEX.encode_to(&mut out);
statements.encode_to(&mut out);
out
}
}
pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
mod rep {
use sc_network::ReputationChange as Rep;
pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad statement message");
}
const LOG_TARGET: &str = "statement-gossip";
const STATEMENT_PROTOCOL_V2: &str = "statement/2";
const STATEMENT_PROTOCOL_V1: &str = "statement/1";
const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
const PENDING_AFFINITIES_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
const SYNC_RECOVERY_READD_DELAY: std::time::Duration = std::time::Duration::from_secs(60);
struct Metrics {
propagated_statements: Counter<U64>,
known_statements_received: Counter<U64>,
skipped_oversized_statements: Counter<U64>,
propagated_statements_chunks: Histogram,
pending_statements: Gauge<U64>,
ignored_statements: Counter<U64>,
peers_connected: Gauge<U64>,
statements_received: Counter<U64>,
bytes_sent_total: Counter<U64>,
bytes_received_total: Counter<U64>,
sent_latency_seconds: Histogram,
initial_sync_statements_sent: Counter<U64>,
initial_sync_bursts_total: Counter<U64>,
initial_sync_peers_active: Gauge<U64>,
initial_sync_duration_seconds: Histogram,
statement_flooding_detected: Counter<U64>,
}
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
propagated_statements: register(
Counter::new(
"substrate_sync_propagated_statements",
"Total statements propagated to peers, counted once per recipient (a statement sent to N peers increments by N)",
)?,
r,
)?,
known_statements_received: register(
Counter::new(
"substrate_sync_known_statement_received",
"Number of statements received via gossiping that were already in the statement store",
)?,
r,
)?,
skipped_oversized_statements: register(
Counter::new(
"substrate_sync_skipped_oversized_statements",
"Number of oversized statements that were skipped to be gossiped",
)?,
r,
)?,
propagated_statements_chunks: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_propagated_statements_chunks",
"Distribution of chunk sizes when propagating statements",
)
.buckets(exponential_buckets(1.0, 2.0, 14)?),
)?,
r,
)?,
pending_statements: register(
Gauge::new(
"substrate_sync_pending_statement_validations",
"Number of pending statement validations, sampled once per propagation tick",
)?,
r,
)?,
ignored_statements: register(
Counter::new(
"substrate_sync_ignored_statements",
"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
)?,
r,
)?,
peers_connected: register(
Gauge::new(
"substrate_sync_statement_peers_connected",
"Number of peers connected using the statement protocol",
)?,
r,
)?,
statements_received: register(
Counter::new(
"substrate_sync_statements_received",
"Total number of statements received from peers",
)?,
r,
)?,
bytes_sent_total: register(
Counter::new(
"substrate_sync_statement_bytes_sent_total",
"Total bytes sent for statement protocol messages",
)?,
r,
)?,
bytes_received_total: register(
Counter::new(
"substrate_sync_statement_bytes_received_total",
"Total bytes received for statement protocol messages (includes bytes from notifications that are later discarded — e.g. while major-syncing)",
)?,
r,
)?,
sent_latency_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_statement_sent_latency_seconds",
"Time to send statement messages to peers",
)
.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
)?,
r,
)?,
initial_sync_statements_sent: register(
Counter::new(
"substrate_sync_initial_sync_statements_sent",
"Total statements sent during initial sync bursts to newly connected peers",
)?,
r,
)?,
initial_sync_bursts_total: register(
Counter::new(
"substrate_sync_initial_sync_bursts_total",
"Total initial-sync burst rounds attempted (includes rounds that return early with no hashes left)",
)?,
r,
)?,
initial_sync_peers_active: register(
Gauge::new(
"substrate_sync_initial_sync_peers_active",
"Number of peers currently being synced via initial sync",
)?,
r,
)?,
initial_sync_duration_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_initial_sync_duration_seconds",
"Per-peer duration of initial sync from start until completion or peer disconnect (whichever comes first)",
)
.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
)?,
r,
)?,
statement_flooding_detected: register(
Counter::new(
"substrate_sync_statement_flooding_detected",
"Number of peers disconnected for exceeding statement rate limits",
)?,
r,
)?,
})
}
}
pub struct StatementHandlerPrototype {
protocol_name: ProtocolName,
notification_service: Box<dyn NotificationService>,
}
impl StatementHandlerPrototype {
pub fn new<
Hash: AsRef<[u8]>,
Block: BlockT,
Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
genesis_hash: Hash,
fork_id: Option<&str>,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, Net::NotificationProtocolConfig) {
let genesis_hash = genesis_hash.as_ref();
let hex = array_bytes::bytes2hex("", genesis_hash);
let (protocol_name, fallback_name) = if let Some(fork_id) = fork_id {
(
format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V2}"),
format!("/{hex}/{fork_id}/{STATEMENT_PROTOCOL_V1}"),
)
} else {
(format!("/{hex}/{STATEMENT_PROTOCOL_V2}"), format!("/{hex}/{STATEMENT_PROTOCOL_V1}"))
};
let (config, notification_service) = Net::notification_config(
protocol_name.clone().into(),
vec![fallback_name.into()],
MAX_STATEMENT_NOTIFICATION_SIZE,
None,
SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
metrics,
peer_store_handle,
);
(Self { protocol_name: protocol_name.into(), notification_service }, config)
}
pub fn build<
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + sp_consensus::SyncOracle,
>(
self,
network: N,
sync: S,
statement_store: Arc<dyn StatementStore>,
metrics_registry: Option<&Registry>,
executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
mut num_submission_workers: usize,
statements_per_second: u32,
) -> error::Result<StatementHandler<N, S>> {
let sync_event_stream = sync.event_stream("statement-handler-sync");
let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
if num_submission_workers == 0 {
log::warn!(
target: LOG_TARGET,
"num_submission_workers is 0, defaulting to 1"
);
num_submission_workers = 1;
}
let statements_per_second = match NonZeroU32::new(statements_per_second) {
Some(rate) => rate,
None => {
log::warn!(
target: LOG_TARGET,
"statements_per_second is 0, defaulting to {}",
DEFAULT_STATEMENTS_PER_SECOND
);
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
},
};
let metrics =
if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
for _ in 0..num_submission_workers {
let store = statement_store.clone();
let mut queue_receiver = queue_receiver.clone();
executor(
async move {
loop {
let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
queue_receiver.next().await;
match task {
None => return,
Some((statement, completion)) => {
let result = store.submit(statement, StatementSource::Network);
if completion.send(result).is_err() {
log::debug!(
target: LOG_TARGET,
"Error sending validation completion"
);
}
},
}
}
}
.boxed(),
);
}
let handler = StatementHandler {
protocol_name: self.protocol_name,
notification_service: self.notification_service,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network,
sync,
sync_event_stream: sync_event_stream.fuse(),
peers: HashMap::new(),
statement_store,
queue_sender,
statements_per_second,
metrics,
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_affinities_timeout: Box::pin(
tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse(),
),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
};
Ok(handler)
}
}
pub struct StatementHandler<
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + sp_consensus::SyncOracle,
> {
protocol_name: ProtocolName,
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
pending_statements:
FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
network: N,
sync: S,
sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
notification_service: Box<dyn NotificationService>,
peers: HashMap<PeerId, Peer>,
statement_store: Arc<dyn StatementStore>,
queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
statements_per_second: NonZeroU32,
metrics: Option<Metrics>,
initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
pending_affinities_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
initial_sync_peer_queue: VecDeque<PeerId>,
deferred_peers: HashSet<PeerId>,
dropped_statements_during_sync: bool,
sync_recovery_peer: Option<PeerId>,
sync_recovery_readd_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
}
#[derive(Debug)]
struct PeerRateLimiter {
limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
}
impl PeerRateLimiter {
fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
let quota = Quota::per_second(statements_per_second).allow_burst(burst);
Self { limiter: RateLimiter::direct(quota) }
}
fn is_flooding(&self, count: usize) -> bool {
if count > u32::MAX as usize {
return true;
}
let Some(n) = NonZeroU32::new(count as u32) else {
return false;
};
!matches!(self.limiter.check_n(n), Ok(Ok(())))
}
}
#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
#[derive(Debug)]
pub struct Peer {
known_statements: LruHashSet<Hash>,
rate_limiter: PeerRateLimiter,
protocol_version: PeerProtocolVersion,
topic_affinity: Option<AffinityFilter>,
is_light: bool,
pending_topic_affinity: Option<AffinityFilter>,
}
struct PendingInitialSync {
hashes: Vec<Hash>,
started_at: Instant,
}
enum ChunkResult {
Send(usize),
SkipOversized,
}
enum SendChunkResult {
Sent(usize),
Skipped,
Empty,
Failed,
}
const V1_ENVELOPE_OVERHEAD: usize = 5;
const V2_ENVELOPE_OVERHEAD: usize = 1 + V1_ENVELOPE_OVERHEAD;
fn max_statement_payload_size(envelope_overhead: usize) -> usize {
debug_assert_eq!(
V1_ENVELOPE_OVERHEAD,
Compact::<u32>::max_encoded_len(),
"V1_ENVELOPE_OVERHEAD must equal Compact::<u32>::max_encoded_len()"
);
MAX_STATEMENT_NOTIFICATION_SIZE as usize - envelope_overhead
}
fn find_sendable_chunk(statements: &[&Statement], envelope_overhead: usize) -> ChunkResult {
if statements.is_empty() {
return ChunkResult::Send(0);
}
let max_size = max_statement_payload_size(envelope_overhead);
let mut accumulated_size = 0;
let mut count = 0usize;
for stmt in &statements[0..] {
let stmt_size = stmt.encoded_size();
let new_count = count + 1;
let new_total = accumulated_size + stmt_size;
if new_total > max_size {
break;
}
accumulated_size += stmt_size;
count = new_count;
}
if count == 0 {
ChunkResult::SkipOversized
} else {
ChunkResult::Send(count)
}
}
impl Peer {
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_for_testing(
known_statements: LruHashSet<Hash>,
statements_per_second: NonZeroU32,
burst: NonZeroU32,
) -> Self {
Self {
known_statements,
rate_limiter: PeerRateLimiter::new(statements_per_second, burst),
protocol_version: PeerProtocolVersion::V1,
topic_affinity: None,
is_light: false,
pending_topic_affinity: None,
}
}
fn can_receive(&self) -> bool {
!(self.is_light &&
self.protocol_version == PeerProtocolVersion::V2 &&
self.topic_affinity.is_none())
}
}
impl<N, S> StatementHandler<N, S>
where
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + sp_consensus::SyncOracle,
{
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_for_testing(
protocol_name: ProtocolName,
notification_service: Box<dyn NotificationService>,
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
network: N,
sync: S,
sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
peers: HashMap<PeerId, Peer>,
statement_store: Arc<dyn StatementStore>,
queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
statements_per_second: NonZeroU32,
) -> Self {
Self {
protocol_name,
notification_service,
propagate_timeout,
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network,
sync,
sync_event_stream,
peers,
statement_store,
queue_sender,
statements_per_second,
metrics: None,
initial_sync_timeout: Box::pin(pending().fuse()),
pending_affinities_timeout: Box::pin(pending().fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn pending_statements_mut(
&mut self,
) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
{
&mut self.pending_statements
}
pub async fn run(mut self) {
loop {
futures::select_biased! {
_ = self.propagate_timeout.next() => {
self.propagate_statements().await;
self.metrics.as_ref().map(|metrics| {
metrics.pending_statements.set(self.pending_statements.len() as u64);
});
},
(hash, result) = self.pending_statements.select_next_some() => {
if let Some(peers) = self.pending_statements_peers.remove(&hash) {
if let Some(result) = result {
peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
}
} else {
log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
}
},
sync_event = self.sync_event_stream.next() => {
if let Some(sync_event) = sync_event {
self.handle_sync_event(sync_event);
} else {
return;
}
}
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
self.handle_notification_event(event).await
} else {
return
}
}
_ = &mut self.initial_sync_timeout => {
self.process_initial_sync_burst().await;
self.initial_sync_timeout =
Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
},
_ = &mut self.pending_affinities_timeout => {
self.process_pending_affinities();
self.pending_affinities_timeout =
Box::pin(tokio::time::sleep(PENDING_AFFINITIES_INTERVAL).fuse());
},
_ = &mut self.sync_recovery_readd_timeout => {
self.try_readd_sync_recovery_peer();
self.sync_recovery_readd_timeout = Box::pin(pending().fuse());
},
}
if !self.sync.is_major_syncing() {
self.drain_deferred_peers();
self.start_sync_recovery();
}
}
}
async fn send_statement_chunk(
&mut self,
peer: &PeerId,
statements: &[&Statement],
) -> SendChunkResult {
let Some(peer_data) = self.peers.get(peer) else {
log::error!(target: LOG_TARGET, "Peer {peer} not found in peers map during send_statement_chunk");
return SendChunkResult::Failed;
};
let peer_version = peer_data.protocol_version;
let envelope_overhead = peer_version.envelope_overhead();
match find_sendable_chunk(statements, envelope_overhead) {
ChunkResult::Send(0) => SendChunkResult::Empty,
ChunkResult::Send(chunk_end) => {
let chunk = &statements[..chunk_end];
let encoded = match peer_version {
PeerProtocolVersion::V1 => chunk.encode(),
PeerProtocolVersion::V2 => StatementMessage::encode_statement_refs(chunk),
};
let bytes_to_send = encoded.len() as u64;
let sent_latency_timer =
self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
let send_result = timeout(
SEND_TIMEOUT,
self.notification_service.send_async_notification(peer, encoded),
)
.await;
drop(sent_latency_timer);
if let Err(e) = send_result {
log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
return SendChunkResult::Failed;
}
log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
self.metrics.as_ref().map(|metrics| {
metrics.propagated_statements.inc_by(chunk.len() as u64);
metrics.bytes_sent_total.inc_by(bytes_to_send);
metrics.propagated_statements_chunks.observe(chunk.len() as f64);
});
SendChunkResult::Sent(chunk_end)
},
ChunkResult::SkipOversized => {
log::warn!(target: LOG_TARGET, "Statement too large, skipping");
self.metrics.as_ref().map(|metrics| {
metrics.skipped_oversized_statements.inc();
});
SendChunkResult::Skipped
},
}
}
fn drain_deferred_peers(&mut self) {
if self.deferred_peers.is_empty() {
return;
}
log::debug!(
target: LOG_TARGET,
"Major sync complete, adding {} deferred statement peers",
self.deferred_peers.len(),
);
let addrs: HashSet<multiaddr::Multiaddr> = self
.deferred_peers
.drain()
.map(|p| {
iter::once(multiaddr::Protocol::P2p(p.into())).collect::<multiaddr::Multiaddr>()
})
.collect();
if let Err(err) = self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs)
{
log::warn!(target: LOG_TARGET, "Failed to add deferred peers: {err}");
}
}
fn start_sync_recovery(&mut self) {
if !self.dropped_statements_during_sync {
return;
}
self.dropped_statements_during_sync = false;
if self.sync_recovery_peer.is_some() {
return;
}
let Some(&peer_id) = self.peers.keys().choose(&mut rand::thread_rng()) else {
return;
};
log::trace!(
target: LOG_TARGET,
"Major sync complete, force-reconnecting {peer_id} for statement recovery",
);
if let Err(err) = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(peer_id).collect(),
) {
log::warn!(target: LOG_TARGET, "Failed to remove peer {peer_id} for sync recovery: {err}");
return;
}
self.sync_recovery_peer = Some(peer_id);
self.sync_recovery_readd_timeout =
Box::pin(tokio::time::sleep(SYNC_RECOVERY_READD_DELAY).fuse());
}
fn try_readd_sync_recovery_peer(&mut self) {
let Some(peer_id) = self.sync_recovery_peer.take() else { return };
log::trace!(
target: LOG_TARGET,
"Re-adding {peer_id} to reserved set after sync recovery window",
);
let addr =
iter::once(multiaddr::Protocol::P2p(peer_id.into())).collect::<multiaddr::Multiaddr>();
if let Err(err) = self
.network
.add_peers_to_reserved_set(self.protocol_name.clone(), iter::once(addr).collect())
{
log::warn!(target: LOG_TARGET, "Failed to re-add sync recovery peer {peer_id}: {err}");
}
}
fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
if self.sync.is_major_syncing() {
log::trace!(
target: LOG_TARGET,
"Major sync in progress, deferring connection to {remote}",
);
self.deferred_peers.insert(remote);
return;
}
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(remote) => {
if self.deferred_peers.remove(&remote) {
return;
}
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
}
},
}
}
async fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let _ = result_tx.send(result);
},
NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
..
} => {
let protocol_version = if negotiated_fallback.is_some() {
PeerProtocolVersion::V1
} else {
PeerProtocolVersion::V2
};
let Some(peer_role) = self.network.peer_role(peer, handshake) else {
log::debug!(
target: LOG_TARGET,
"Peer {peer} connected but role could not be determined, ignoring"
);
return;
};
let is_light = peer_role.is_light();
log::debug!(
target: LOG_TARGET,
"Peer {peer} connected with statement protocol {protocol_version:?}, role={peer_role:?}"
);
let _was_in = self.peers.insert(
peer,
Peer {
known_statements: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
),
rate_limiter: PeerRateLimiter::new(
self.statements_per_second,
NonZeroU32::new(
self.statements_per_second.get() *
config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
protocol_version,
topic_affinity: None,
is_light,
pending_topic_affinity: None,
},
);
debug_assert!(_was_in.is_none());
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});
if self.peers.get(&peer).map_or(false, |p| p.can_receive()) {
self.schedule_initial_sync_for_peer(peer);
}
},
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(pending.started_at.elapsed().as_secs_f64());
});
}
self.initial_sync_peer_queue.retain(|p| *p != peer);
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});
},
NotificationEvent::NotificationReceived { peer, notification } => {
let bytes_received = notification.len() as u64;
self.metrics.as_ref().map(|metrics| {
metrics.bytes_received_total.inc_by(bytes_received);
});
if self.sync.is_major_syncing() {
log::trace!(
target: LOG_TARGET,
"{peer}: Ignoring statements while major syncing or offline"
);
self.dropped_statements_during_sync = true;
return;
}
let Some(peer_data) = self.peers.get(&peer) else {
log::error!(target: LOG_TARGET, "Received notification from unknown peer {peer}");
return;
};
match peer_data.protocol_version {
PeerProtocolVersion::V1 => {
if let Ok(statements) =
<Statements as Decode>::decode(&mut notification.as_ref())
{
self.on_statements(peer, statements);
} else {
log::debug!(
target: LOG_TARGET,
"Failed to decode v1 statement list from {peer}"
);
self.network.report_peer(peer, rep::BAD_MESSAGE);
}
},
PeerProtocolVersion::V2 => {
if let Ok(message) = StatementMessage::decode(&mut notification.as_ref()) {
match message {
StatementMessage::Statements(statements) => {
self.on_statements(peer, statements)
},
StatementMessage::ExplicitTopicAffinity(filter) => {
if let Some(peer_data) = self.peers.get_mut(&peer) {
if peer_data.rate_limiter.is_flooding(1) {
log::debug!(
target: LOG_TARGET,
"Rate-limiting ExplicitTopicAffinity from {peer}"
);
self.network.report_peer(peer, rep::BAD_MESSAGE);
} else {
log::debug!(
target: LOG_TARGET,
"Received topic affinity filter from {peer}"
);
peer_data.pending_topic_affinity = Some(filter);
}
}
},
}
} else {
log::debug!(
target: LOG_TARGET,
"Failed to decode v2 statement message from {peer}"
);
self.network.report_peer(peer, rep::BAD_MESSAGE);
}
},
}
},
}
}
#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
self.metrics.as_ref().map(|metrics| {
metrics.statements_received.inc_by(statements.len() as u64);
});
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if peer.rate_limiter.is_flooding(statements.len()) {
log::warn!(
target: LOG_TARGET,
"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
who,
self.statements_per_second
);
self.network.report_peer(who, rep::STATEMENT_FLOODING);
self.network.disconnect_peer(who, self.protocol_name.clone());
if let Some(ref metrics) = self.metrics {
metrics.statement_flooding_detected.inc();
}
return;
}
let mut statements_left = statements.len() as u64;
for s in statements {
if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
log::debug!(
target: LOG_TARGET,
"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
statements_left,
MAX_PENDING_STATEMENTS,
);
self.metrics.as_ref().map(|metrics| {
metrics.ignored_statements.inc_by(statements_left);
});
break;
}
let hash = s.hash();
peer.known_statements.insert(hash);
if self.statement_store.has_statement(&hash) {
self.metrics.as_ref().map(|metrics| {
metrics.known_statements_received.inc();
});
if let Some(peers) = self.pending_statements_peers.get(&hash) {
if peers.contains(&who) {
log::trace!(
target: LOG_TARGET,
"Already received the statement from the same peer {who}.",
);
self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
}
}
continue;
}
self.network.report_peer(who, rep::ANY_STATEMENT);
match self.pending_statements_peers.entry(hash) {
Entry::Vacant(entry) => {
let (completion_sender, completion_receiver) = oneshot::channel();
match self.queue_sender.try_send((s, completion_sender)) {
Ok(()) => {
self.pending_statements.push(
async move {
let res = completion_receiver.await;
(hash, res.ok())
}
.boxed(),
);
entry.insert(HashSet::from_iter([who]));
},
Err(async_channel::TrySendError::Full(_)) => {
log::debug!(
target: LOG_TARGET,
"Dropped statement because validation channel is full",
);
},
Err(async_channel::TrySendError::Closed(_)) => {
log::trace!(
target: LOG_TARGET,
"Dropped statement because validation channel is closed",
);
},
}
},
Entry::Occupied(mut entry) => {
if !entry.get_mut().insert(who) {
self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
}
},
}
statements_left -= 1;
}
}
}
fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
match import {
SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
SubmitResult::KnownExpired => {},
SubmitResult::Rejected(_) => {},
SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
SubmitResult::InternalError(_) => {},
}
}
pub async fn propagate_statement(&mut self, hash: &Hash) {
if self.sync.is_major_syncing() {
return;
}
log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
if let Ok(Some(statement)) = self.statement_store.statement(hash) {
self.do_propagate_statements(&[(*hash, statement)]).await;
}
}
async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
let Some(peer) = self.peers.get_mut(who) else {
return;
};
if !peer.can_receive() {
return;
}
let to_send: Vec<_> = statements
.iter()
.filter_map(|(hash, stmt)| {
if peer.known_statements.contains(hash) {
return None;
}
if peer.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
return None;
}
peer.known_statements.insert(*hash);
Some(stmt)
})
.collect();
log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
if to_send.is_empty() {
return;
}
self.send_statements_in_chunks(who, &to_send).await;
}
async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
let mut offset = 0;
while offset < statements.len() {
match self.send_statement_chunk(who, &statements[offset..]).await {
SendChunkResult::Sent(chunk_end) => {
offset += chunk_end;
},
SendChunkResult::Skipped => {
offset += 1;
},
SendChunkResult::Empty | SendChunkResult::Failed => return,
}
}
}
async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
let peers: Vec<_> = self.peers.keys().copied().collect();
for who in peers {
log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
self.send_statements_to_peer(&who, statements).await;
}
log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
}
async fn propagate_statements(&mut self) {
if self.sync.is_major_syncing() {
return;
}
let Ok(statements) = self.statement_store.take_recent_statements() else { return };
if !statements.is_empty() {
self.do_propagate_statements(&statements).await;
}
}
fn schedule_initial_sync_for_peer(&mut self, peer: PeerId) {
if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
self.record_initial_sync_completion(pending.started_at);
self.initial_sync_peer_queue.retain(|p| *p != peer);
}
let hashes = self.statement_store.statement_hashes();
if let Some(peer_data) = self.peers.get_mut(&peer) {
peer_data.known_statements.clear();
}
if !hashes.is_empty() {
self.pending_initial_syncs
.insert(peer, PendingInitialSync { hashes, started_at: Instant::now() });
self.initial_sync_peer_queue.push_back(peer);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.inc();
});
}
}
fn process_pending_affinities(&mut self) {
let ready_peers: Vec<PeerId> = self
.peers
.iter()
.filter(|(peer_id, peer_data)| {
peer_data.pending_topic_affinity.is_some() &&
!self.pending_initial_syncs.contains_key(peer_id)
})
.map(|(peer_id, _)| *peer_id)
.collect();
for peer_id in ready_peers {
if let Some(peer_data) = self.peers.get_mut(&peer_id) {
peer_data.topic_affinity = peer_data.pending_topic_affinity.take();
}
self.schedule_initial_sync_for_peer(peer_id);
}
}
fn record_initial_sync_completion(&self, started_at: Instant) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(started_at.elapsed().as_secs_f64());
});
}
async fn process_initial_sync_burst(&mut self) {
if self.sync.is_major_syncing() {
return;
}
let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
return;
};
let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
return;
};
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_bursts_total.inc();
});
if entry.get().hashes.is_empty() {
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
}
let Some(peer_data) = self.peers.get(&peer_id) else {
log::error!(target: LOG_TARGET, "Peer {peer_id} has pending initial sync but is not in peers map");
entry.remove();
return;
};
let envelope_overhead = peer_data.protocol_version.envelope_overhead();
let max_size = max_statement_payload_size(envelope_overhead);
let mut accumulated_size = 0;
let (statements, processed) = match self.statement_store.statements_by_hashes(
&entry.get().hashes,
&mut |hash, encoded, stmt| {
if peer_data.known_statements.contains(hash) {
return FilterDecision::Skip;
}
if peer_data.topic_affinity.as_ref().is_some_and(|a| !a.matches_statement(stmt)) {
return FilterDecision::Skip;
}
if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
return FilterDecision::Abort;
}
accumulated_size += encoded.len();
FilterDecision::Take
},
) {
Ok(r) => r,
Err(e) => {
log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
},
};
entry.get_mut().hashes.drain(..processed);
let has_more = !entry.get().hashes.is_empty();
drop(entry);
let send_stmts: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
match self.send_statement_chunk(&peer_id, &send_stmts).await {
SendChunkResult::Failed => {
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
return;
},
SendChunkResult::Sent(sent) => {
debug_assert_eq!(send_stmts.len(), sent);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_statements_sent.inc_by(sent as u64);
});
if let Some(peer) = self.peers.get_mut(&peer_id) {
for (hash, _) in &statements {
peer.known_statements.insert(*hash);
}
}
},
SendChunkResult::Empty | SendChunkResult::Skipped => {},
}
if has_more {
self.initial_sync_peer_queue.push_back(peer_id);
} else {
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{
atomic::{AtomicBool, Ordering},
Mutex,
};
const BLOOM_SEED: u128 = 0x5EED_5EED_5EED_5EED;
#[derive(Clone)]
struct TestNetwork {
reported_peers: Arc<Mutex<Vec<(PeerId, sc_network::ReputationChange)>>>,
disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
default_role: sc_network::ObservedRole,
added_reserved: Arc<Mutex<Vec<HashSet<sc_network::Multiaddr>>>>,
removed_reserved: Arc<Mutex<Vec<Vec<PeerId>>>>,
}
impl TestNetwork {
fn new() -> Self {
Self {
reported_peers: Arc::new(Mutex::new(Vec::new())),
disconnected_peers: Arc::new(Mutex::new(Vec::new())),
default_role: sc_network::ObservedRole::Full,
added_reserved: Arc::new(Mutex::new(Vec::new())),
removed_reserved: Arc::new(Mutex::new(Vec::new())),
}
}
fn new_light() -> Self {
Self {
reported_peers: Arc::new(Mutex::new(Vec::new())),
disconnected_peers: Arc::new(Mutex::new(Vec::new())),
default_role: sc_network::ObservedRole::Light,
added_reserved: Arc::new(Mutex::new(Vec::new())),
removed_reserved: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_reports(&self) -> Vec<(PeerId, sc_network::ReputationChange)> {
self.reported_peers.lock().unwrap().clone()
}
fn get_disconnected_peers(&self) -> Vec<PeerId> {
self.disconnected_peers.lock().unwrap().clone()
}
fn get_added_reserved(&self) -> Vec<HashSet<sc_network::Multiaddr>> {
self.added_reserved.lock().unwrap().clone()
}
fn get_removed_reserved(&self) -> Vec<Vec<PeerId>> {
self.removed_reserved.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
unimplemented!()
}
fn set_authorized_only(&self, _: bool) {
unimplemented!()
}
fn add_known_address(&self, _: PeerId, _: sc_network::Multiaddr) {
unimplemented!()
}
fn report_peer(&self, peer_id: PeerId, cost_benefit: sc_network::ReputationChange) {
self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
}
fn peer_reputation(&self, _: &PeerId) -> i32 {
unimplemented!()
}
fn disconnect_peer(&self, peer: PeerId, _: sc_network::ProtocolName) {
self.disconnected_peers.lock().unwrap().push(peer);
}
fn accept_unreserved_peers(&self) {
unimplemented!()
}
fn deny_unreserved_peers(&self) {
unimplemented!()
}
fn add_reserved_peer(
&self,
_: sc_network::config::MultiaddrWithPeerId,
) -> Result<(), String> {
unimplemented!()
}
fn remove_reserved_peer(&self, _: PeerId) {
unimplemented!()
}
fn set_reserved_peers(
&self,
_: sc_network::ProtocolName,
_: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}
fn add_peers_to_reserved_set(
&self,
_: sc_network::ProtocolName,
addrs: std::collections::HashSet<sc_network::Multiaddr>,
) -> Result<(), String> {
self.added_reserved.lock().unwrap().push(addrs);
Ok(())
}
fn remove_peers_from_reserved_set(
&self,
_: sc_network::ProtocolName,
peers: Vec<PeerId>,
) -> Result<(), String> {
self.removed_reserved.lock().unwrap().push(peers);
Ok(())
}
fn sync_num_connected(&self) -> usize {
unimplemented!()
}
fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<sc_network::ObservedRole> {
Some(self.default_role)
}
async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
unimplemented!();
}
}
#[derive(Clone)]
struct TestSync {
major_syncing: Arc<AtomicBool>,
}
impl TestSync {
fn new() -> Self {
Self { major_syncing: Arc::new(AtomicBool::new(false)) }
}
fn with_syncing(initial: bool) -> (Self, Arc<AtomicBool>) {
let flag = Arc::new(AtomicBool::new(initial));
(Self { major_syncing: flag.clone() }, flag)
}
}
impl SyncEventStream for TestSync {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>> {
Box::pin(futures::stream::pending())
}
}
impl sp_consensus::SyncOracle for TestSync {
fn is_major_syncing(&self) -> bool {
self.major_syncing.load(Ordering::Relaxed)
}
fn is_offline(&self) -> bool {
unimplemented!()
}
}
impl NetworkEventStream for TestNetwork {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = sc_network::Event> + Send>> {
unimplemented!()
}
}
#[derive(Debug, Clone)]
struct TestNotificationService {
sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
}
impl TestNotificationService {
fn new() -> Self {
Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
}
fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
self.sent_notifications.lock().unwrap().clone()
}
fn clear_sent_notifications(&self) {
self.sent_notifications.lock().unwrap().clear();
}
}
#[async_trait::async_trait]
impl NotificationService for TestNotificationService {
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
self.sent_notifications.lock().unwrap().push((*peer, notification));
}
async fn send_async_notification(
&mut self,
peer: &PeerId,
notification: Vec<u8>,
) -> Result<(), sc_network::error::Error> {
self.sent_notifications.lock().unwrap().push((*peer, notification));
Ok(())
}
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
async fn next_event(&mut self) -> Option<sc_network::service::traits::NotificationEvent> {
None
}
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
unimplemented!()
}
fn protocol(&self) -> &sc_network::types::ProtocolName {
unimplemented!()
}
fn message_sink(
&self,
_peer: &PeerId,
) -> Option<Box<dyn sc_network::service::traits::MessageSink>> {
unimplemented!()
}
}
#[derive(Clone)]
struct TestStatementStore {
statements: Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
recent_statements:
Arc<Mutex<HashMap<sp_statement_store::Hash, sp_statement_store::Statement>>>,
}
impl TestStatementStore {
fn new() -> Self {
Self { statements: Default::default(), recent_statements: Default::default() }
}
}
impl StatementStore for TestStatementStore {
fn statements(
&self,
) -> sp_statement_store::Result<
Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
> {
Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
}
fn take_recent_statements(
&self,
) -> sp_statement_store::Result<
Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
> {
Ok(self.recent_statements.lock().unwrap().drain().collect())
}
fn statement(
&self,
_hash: &sp_statement_store::Hash,
) -> sp_statement_store::Result<Option<sp_statement_store::Statement>> {
unimplemented!()
}
fn has_statement(&self, hash: &sp_statement_store::Hash) -> bool {
self.statements.lock().unwrap().contains_key(hash)
}
fn statement_hashes(&self) -> Vec<sp_statement_store::Hash> {
self.statements.lock().unwrap().keys().cloned().collect()
}
fn statements_by_hashes(
&self,
hashes: &[sp_statement_store::Hash],
filter: &mut dyn FnMut(
&sp_statement_store::Hash,
&[u8],
&sp_statement_store::Statement,
) -> FilterDecision,
) -> sp_statement_store::Result<(
Vec<(sp_statement_store::Hash, sp_statement_store::Statement)>,
usize,
)> {
let statements = self.statements.lock().unwrap();
let mut result = Vec::new();
let mut processed = 0;
for hash in hashes {
let Some(stmt) = statements.get(hash) else {
processed += 1;
continue;
};
let encoded = stmt.encode();
match filter(hash, &encoded, stmt) {
FilterDecision::Skip => {
processed += 1;
},
FilterDecision::Take => {
processed += 1;
result.push((*hash, stmt.clone()));
},
FilterDecision::Abort => break,
}
}
Ok((result, processed))
}
fn broadcasts(
&self,
_match_all_topics: &[sp_statement_store::Topic],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted(
&self,
_match_all_topics: &[sp_statement_store::Topic],
_dest: [u8; 32],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_clear(
&self,
_match_all_topics: &[sp_statement_store::Topic],
_dest: [u8; 32],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn broadcasts_stmt(
&self,
_match_all_topics: &[sp_statement_store::Topic],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_stmt(
&self,
_match_all_topics: &[sp_statement_store::Topic],
_dest: [u8; 32],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_clear_stmt(
&self,
_match_all_topics: &[sp_statement_store::Topic],
_dest: [u8; 32],
) -> sp_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn submit(
&self,
_statement: sp_statement_store::Statement,
_source: sp_statement_store::StatementSource,
) -> sp_statement_store::SubmitResult {
unimplemented!()
}
fn remove(&self, _hash: &sp_statement_store::Hash) -> sp_statement_store::Result<()> {
unimplemented!()
}
fn remove_by(&self, _who: [u8; 32]) -> sp_statement_store::Result<()> {
unimplemented!()
}
}
fn build_handler(
num_peers: usize,
) -> (
StatementHandler<TestNetwork, TestSync>,
TestStatementStore,
TestNetwork,
TestNotificationService,
async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
Vec<PeerId>,
) {
let statement_store = TestStatementStore::new();
let (queue_sender, queue_receiver) = async_channel::bounded(100);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let mut peers = HashMap::new();
let mut peer_ids = Vec::with_capacity(num_peers);
for _ in 0..num_peers {
let peer_id = PeerId::random();
peer_ids.push(peer_id);
peers.insert(
peer_id,
Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(1000).unwrap()),
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
protocol_version: PeerProtocolVersion::V1,
topic_affinity: None,
is_light: false,
pending_topic_affinity: None,
},
);
}
let handler = StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: TestSync::new(),
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers,
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
};
(handler, statement_store, network, notification_service, queue_receiver, peer_ids)
}
fn get_peer_hashes(sent: &[(PeerId, Vec<u8>)], peer: PeerId) -> Vec<sp_statement_store::Hash> {
sent.iter()
.filter(|(p, _)| *p == peer)
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect()
}
async fn dispatch_disconnects(
handler: &mut StatementHandler<TestNetwork, TestSync>,
network: &TestNetwork,
) {
for peer in network.get_disconnected_peers() {
handler
.handle_notification_event(NotificationEvent::NotificationStreamClosed { peer })
.await;
}
}
#[tokio::test]
async fn test_skips_processing_statements_that_already_in_store() {
let (mut handler, statement_store, _network, _notification_service, queue_receiver, _) =
build_handler(1);
let mut statement1 = Statement::new();
statement1.set_plain_data(b"statement1".to_vec());
let hash1 = statement1.hash();
statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
let mut statement2 = Statement::new();
statement2.set_plain_data(b"statement2".to_vec());
let hash2 = statement2.hash();
let peer_id = *handler.peers.keys().next().unwrap();
handler.on_statements(peer_id, vec![statement1, statement2]);
let to_submit = queue_receiver.try_recv();
assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
let no_more = queue_receiver.try_recv();
assert!(no_more.is_err(), "Expected only one statement to be queued");
}
#[tokio::test]
async fn test_reports_for_duplicate_statements() {
let (mut handler, statement_store, network, _notification_service, queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let mut statement1 = Statement::new();
statement1.set_plain_data(b"statement1".to_vec());
handler.on_statements(peer_id, vec![statement1.clone()]);
{
let (s, _) = queue_receiver.try_recv().unwrap();
let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
}
handler.on_statements(peer_id, vec![statement1]);
let reports = network.get_reports();
assert_eq!(
reports,
vec![
(peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
reports
);
}
#[tokio::test]
async fn test_splits_large_batches_into_smaller_chunks() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
build_handler(1);
let num_statements = 30;
let statement_size = 100 * 1024; for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = i as u8;
statement.set_plain_data(data);
let hash = statement.hash();
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let mut total_statements_sent = 0;
assert!(
sent.len() == 3,
"Expected batch to be split into 3 chunks, but got {} chunks",
sent.len()
);
for (_peer, notification) in sent.iter() {
assert!(
notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
"Notification size {} exceeds limit {}",
notification.len(),
MAX_STATEMENT_NOTIFICATION_SIZE
);
if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
total_statements_sent += stmts.len();
}
}
assert_eq!(
total_statements_sent, num_statements,
"Expected all {} statements to be sent, but only {} were sent",
num_statements, total_statements_sent
);
}
#[tokio::test]
async fn test_skips_only_oversized_statements() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
build_handler(1);
let mut statement1 = Statement::new();
statement1.set_plain_data(vec![1u8; 100]);
let hash1 = statement1.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash1, statement1.clone());
let mut oversized1 = Statement::new();
oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
let hash_oversized1 = oversized1.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_oversized1, oversized1);
let mut statement2 = Statement::new();
statement2.set_plain_data(vec![3u8; 100]);
let hash2 = statement2.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash2, statement2.clone());
let mut oversized2 = Statement::new();
oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
let hash_oversized2 = oversized2.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_oversized2, oversized2);
let mut statement3 = Statement::new();
statement3.set_plain_data(vec![5u8; 100]);
let hash3 = statement3.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash3, statement3.clone());
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let mut sent_hashes = sent
.iter()
.flat_map(|(_peer, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect::<Vec<_>>();
sent_hashes.sort();
let mut expected_hashes = vec![hash1, hash2, hash3];
expected_hashes.sort();
assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
}
fn build_handler_no_peers() -> (
StatementHandler<TestNetwork, TestSync>,
TestStatementStore,
TestNetwork,
TestNotificationService,
) {
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(2);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let handler = StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: TestSync::new(),
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
};
(handler, statement_store, network, notification_service)
}
fn build_handler_no_peers_light() -> (
StatementHandler<TestNetwork, TestSync>,
TestStatementStore,
TestNetwork,
TestNotificationService,
) {
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(2);
let network = TestNetwork::new_light();
let notification_service = TestNotificationService::new();
let handler = StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: TestSync::new(),
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
};
(handler, statement_store, network, notification_service)
}
#[tokio::test]
async fn test_initial_sync_burst_single_peer() {
let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
let num_statements = 200;
let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = (i % 256) as u8;
data[1] = (i / 256) as u8;
statement.set_plain_data(data);
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.statements.lock().unwrap().insert(hash, statement);
}
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
assert!(handler.peers.contains_key(&peer_id));
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.initial_sync_peer_queue.len(), 1);
let mut burst_count = 0;
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
burst_count += 1;
assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
}
assert!(
burst_count >= 10,
"Expected multiple bursts for 200 statements of 100KB each, got {}",
burst_count
);
let sent = notification_service.get_sent_notifications();
let mut sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(peer, notification)| {
assert_eq!(*peer, peer_id);
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
sent_hashes.sort();
expected_hashes.sort();
assert_eq!(
sent_hashes.len(),
expected_hashes.len(),
"Expected {} statements to be sent, got {}",
expected_hashes.len(),
sent_hashes.len()
);
assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
assert!(handler.initial_sync_peer_queue.is_empty());
}
#[tokio::test]
async fn test_initial_sync_burst_multiple_peers_round_robin() {
let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
let num_statements = 200;
let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = (i % 256) as u8;
data[1] = (i / 256) as u8;
statement.set_plain_data(data);
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.statements.lock().unwrap().insert(hash, statement);
}
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
for peer in [peer1, peer2, peer3] {
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
}
assert_eq!(handler.peers.len(), 3);
assert_eq!(handler.pending_initial_syncs.len(), 3);
assert_eq!(handler.initial_sync_peer_queue.len(), 3);
let mut peer_burst_order = Vec::new();
let mut burst_count = 0;
while !handler.pending_initial_syncs.is_empty() {
if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
peer_burst_order.push(next_peer);
}
handler.process_initial_sync_burst().await;
burst_count += 1;
assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
}
assert!(
burst_count >= 30,
"Expected many bursts for 3 peers with 200 statements each, got {}",
burst_count
);
assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
let sent = notification_service.get_sent_notifications();
let mut peer1_hashes = get_peer_hashes(&sent, peer1);
let mut peer2_hashes = get_peer_hashes(&sent, peer2);
let mut peer3_hashes = get_peer_hashes(&sent, peer3);
peer1_hashes.sort();
peer2_hashes.sort();
peer3_hashes.sort();
expected_hashes.sort();
assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
assert!(handler.pending_initial_syncs.is_empty());
assert!(handler.initial_sync_peer_queue.is_empty());
}
#[tokio::test]
async fn test_send_statements_in_chunks_exact_max_size() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver, _) =
build_handler(1);
let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
let num_statements: usize = 100;
let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
let total_data_size = max_size - total_overhead;
let per_statement_data_size = total_data_size / num_statements;
let remainder = total_data_size % num_statements;
let mut expected_hashes = Vec::with_capacity(num_statements);
let mut total_encoded_size = 0;
for i in 0..num_statements {
let mut statement = Statement::new();
let extra = if i < remainder { 1 } else { 0 };
let mut data = vec![42u8; per_statement_data_size + extra];
data[0] = i as u8;
data[1] = (i >> 8) as u8;
statement.set_plain_data(data);
total_encoded_size += statement.encoded_size();
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
assert!(
total_encoded_size == max_size,
"Total encoded size {} should be <= max_size {}",
total_encoded_size,
max_size
);
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(
sent.len(),
1,
"Expected 1 notification for all {} statements, but got {}",
num_statements,
sent.len()
);
let (_peer, notification) = &sent[0];
assert!(
notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
"Notification size {} exceeds limit {}",
notification.len(),
MAX_STATEMENT_NOTIFICATION_SIZE
);
let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
assert_eq!(
decoded.len(),
num_statements,
"Expected {} statements in the notification",
num_statements
);
let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
expected_hashes.sort();
received_hashes.sort();
assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
}
#[tokio::test]
async fn test_initial_sync_burst_size_limit_consistency() {
let (mut handler, statement_store, _network, notification_service, _, _) = build_handler(0);
let payload_limit = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
let first_stmt_data_size = payload_limit / 2 + 10;
let mut stmt1 = Statement::new();
stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
let stmt1_encoded_size = stmt1.encoded_size();
let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
let stmt2_encoded_size = stmt2.encoded_size();
let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
assert!(
total_encoded > payload_limit,
"Total {} should exceed payload_limit {} so filter rejects second statement",
total_encoded,
payload_limit
);
let hash1 = stmt1.hash();
let hash2 = stmt2.hash();
statement_store.statements.lock().unwrap().insert(hash1, stmt1);
statement_store.statements.lock().unwrap().insert(hash2, stmt2);
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
handler.process_initial_sync_burst().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(sent.len(), 1, "First burst should send one notification");
let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
assert_eq!(decoded.len(), 1, "First notification should contain one statement");
let sent_hash = decoded[0].hash();
assert!(
sent_hash == hash1 || sent_hash == hash2,
"Sent statement should be one of the two created"
);
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
handler.process_initial_sync_burst().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(sent.len(), 2, "Second burst should send another notification");
let mut sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
sent_hashes.sort();
let mut expected_hashes = vec![hash1, hash2];
expected_hashes.sort();
assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
}
#[tokio::test]
async fn test_peer_disconnected_on_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let mut flood_statements = Vec::new();
for i in 0..600_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
flood_statements.push(statement);
}
handler.on_statements(peer_id, flood_statements);
let reports = network.get_reports();
assert!(
reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
peer_id,
disconnected
);
dispatch_disconnects(&mut handler, &network).await;
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
assert!(
!handler.pending_initial_syncs.contains_key(&peer_id),
"Peer should be removed from pending_initial_syncs"
);
assert!(
!handler.initial_sync_peer_queue.contains(&peer_id),
"Peer should be removed from initial_sync_peer_queue"
);
}
#[tokio::test]
async fn test_legitimate_traffic_not_flagged() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let start = std::time::Instant::now();
let duration = std::time::Duration::from_secs(5);
let mut counter = 0u32;
while start.elapsed() < duration {
let mut statements = Vec::new();
for i in 0..5_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
counter as u8,
(counter >> 8) as u8,
(counter >> 16) as u8,
i as u8,
]);
statements.push(statement);
counter = counter.wrapping_add(1);
}
handler.on_statements(peer_id, statements);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let reports = network.get_reports();
assert!(
!reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
!disconnected.contains(&peer_id),
"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
disconnected
);
assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
}
#[tokio::test]
async fn test_just_over_rate_limit_triggers_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let mut statements = Vec::new();
for i in 0..260_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
i as u8,
(i >> 8) as u8,
(i >> 16) as u8,
(i >> 24) as u8,
]);
statements.push(statement);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
assert!(
reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
expected_burst,
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
disconnected
);
dispatch_disconnects(&mut handler, &network).await;
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
}
#[tokio::test]
async fn test_burst_of_250k_statements_allowed() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let mut statements = Vec::new();
for i in 0..250_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
i as u8,
(i >> 8) as u8,
(i >> 16) as u8,
(i >> 24) as u8,
]);
statements.push(statement);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
assert!(
!reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
reports
);
assert!(
handler.peers.contains_key(&peer_id),
"Peer should still be connected after 250k burst"
);
}
#[tokio::test]
async fn test_sustained_rate_above_limit_triggers_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver, _) =
build_handler(1);
let peer_id = *handler.peers.keys().next().unwrap();
let mut counter = 0u32;
let start = std::time::Instant::now();
let duration = std::time::Duration::from_secs(5);
let mut flooding_detected = false;
while start.elapsed() < duration {
let mut statements = Vec::new();
for i in 0..30_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
counter as u8,
(counter >> 8) as u8,
(counter >> 16) as u8,
i as u8,
]);
statements.push(statement);
counter = counter.wrapping_add(1);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
if reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
{
flooding_detected = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
disconnected
);
dispatch_disconnects(&mut handler, &network).await;
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
}
#[tokio::test]
async fn test_v2_peer_detected_when_no_fallback() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert_eq!(
handler.peers.get(&peer_id).unwrap().protocol_version,
PeerProtocolVersion::V2,
"Peer should be detected as v2 when no fallback is negotiated"
);
}
#[tokio::test]
async fn test_v1_peer_detected_when_fallback_negotiated() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
assert_eq!(
handler.peers.get(&peer_id).unwrap().protocol_version,
PeerProtocolVersion::V1,
"Peer should be detected as v1 when fallback is negotiated"
);
}
#[tokio::test]
async fn test_v1_peer_decodes_raw_statements() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
let (queue_sender, queue_receiver) = async_channel::bounded(10);
handler.queue_sender = queue_sender;
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
let mut statement = Statement::new();
statement.set_plain_data(b"v1 statement".to_vec());
let hash = statement.hash();
let raw_encoded = vec![statement].encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: raw_encoded.into(),
})
.await;
let (received, _) = queue_receiver.try_recv().unwrap();
assert_eq!(received.hash(), hash, "V1 peer's raw statement should be decoded correctly");
}
#[tokio::test]
async fn test_v2_peer_decodes_statement_message() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
let (queue_sender, queue_receiver) = async_channel::bounded(10);
handler.queue_sender = queue_sender;
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
let mut statement = Statement::new();
statement.set_plain_data(b"v2 statement".to_vec());
let hash = statement.hash();
let msg = StatementMessage::Statements(vec![statement]);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
let (received, _) = queue_receiver.try_recv().unwrap();
assert_eq!(received.hash(), hash, "V2 peer's StatementMessage should be decoded correctly");
}
#[tokio::test]
async fn test_v2_peer_topic_affinity_stored() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert!(
handler.peers.get(&peer_id).unwrap().topic_affinity.is_none(),
"Topic affinity should be None initially"
);
let topic: [u8; 32] = [0xAA; 32];
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
let peer_data = handler.peers.get(&peer_id).unwrap();
assert!(
peer_data.topic_affinity.is_some(),
"Topic affinity should be set after receiving ExplicitTopicAffinity"
);
assert!(
peer_data.topic_affinity.as_ref().unwrap().contains(&topic),
"Stored affinity filter should match the topic"
);
}
#[tokio::test]
async fn test_topic_affinity_filters_propagation() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
let topic_aa: [u8; 32] = [0xAA; 32];
let topic_bb: [u8; 32] = [0xBB; 32];
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic_aa);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
let mut stmt_matching = Statement::new();
stmt_matching.set_plain_data(b"matching".to_vec());
stmt_matching.set_topic(0, topic_aa.into());
let hash_matching = stmt_matching.hash();
let mut stmt_not_matching = Statement::new();
stmt_not_matching.set_plain_data(b"not matching".to_vec());
stmt_not_matching.set_topic(0, topic_bb.into());
let hash_not_matching = stmt_not_matching.hash();
let mut stmt_no_topic = Statement::new();
stmt_no_topic.set_plain_data(b"no topic".to_vec());
let hash_no_topic = stmt_no_topic.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_matching, stmt_matching);
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_not_matching, stmt_not_matching);
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_no_topic, stmt_no_topic);
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let mut sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(_, notification)| {
match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
StatementMessage::Statements(stmts) => stmts,
_ => panic!("Expected StatementMessage::Statements"),
}
})
.map(|s| s.hash())
.collect();
sent_hashes.sort();
assert!(
sent_hashes.contains(&hash_matching),
"Statement matching topic affinity should be propagated"
);
assert!(
sent_hashes.contains(&hash_no_topic),
"Statement with no topics should be propagated (broadcast)"
);
assert!(
!sent_hashes.contains(&hash_not_matching),
"Statement NOT matching topic affinity should be filtered out"
);
}
#[tokio::test]
async fn test_v1_peer_no_topic_filtering() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
let topic_aa: [u8; 32] = [0xAA; 32];
let mut stmt_with_topic = Statement::new();
stmt_with_topic.set_plain_data(b"with topic".to_vec());
stmt_with_topic.set_topic(0, topic_aa.into());
let hash_with_topic = stmt_with_topic.hash();
let mut stmt_no_topic = Statement::new();
stmt_no_topic.set_plain_data(b"no topic".to_vec());
let hash_no_topic = stmt_no_topic.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_with_topic, stmt_with_topic);
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_no_topic, stmt_no_topic);
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
assert_eq!(
sent_hashes.len(),
2,
"V1 peer should receive all statements regardless of topics"
);
assert!(sent_hashes.contains(&hash_with_topic));
assert!(sent_hashes.contains(&hash_no_topic));
}
#[tokio::test]
async fn test_affinity_change_triggers_resync() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers_light();
let peer_id = PeerId::random();
let topic_aa: [u8; 32] = [0xAA; 32];
let topic_bb: [u8; 32] = [0xBB; 32];
let mut stmt_aa = Statement::new();
stmt_aa.set_plain_data(b"stmt_aa".to_vec());
stmt_aa.set_topic(0, topic_aa.into());
let hash_aa = stmt_aa.hash();
let mut stmt_bb = Statement::new();
stmt_bb.set_plain_data(b"stmt_bb".to_vec());
stmt_bb.set_topic(0, topic_bb.into());
let hash_bb = stmt_bb.hash();
let mut stmt_no_topic = Statement::new();
stmt_no_topic.set_plain_data(b"no topic".to_vec());
let hash_no_topic = stmt_no_topic.hash();
statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa);
statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb);
statement_store.statements.lock().unwrap().insert(hash_no_topic, stmt_no_topic);
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert!(
!handler.pending_initial_syncs.contains_key(&peer_id),
"Light V2 peer should NOT have initial sync scheduled on connect"
);
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic_aa);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
assert!(
handler.pending_initial_syncs.contains_key(&peer_id),
"Initial sync should be scheduled after setting affinity"
);
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
}
let sent = notification_service.get_sent_notifications();
let sent_hashes: HashSet<_> = sent
.iter()
.flat_map(|(_, notification)| {
match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
StatementMessage::Statements(stmts) => stmts,
_ => panic!("Expected StatementMessage::Statements"),
}
})
.map(|s| s.hash())
.collect();
assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
assert!(
sent_hashes.contains(&hash_no_topic),
"stmt_no_topic should be sent (broadcast, no topic)"
);
assert!(!sent_hashes.contains(&hash_bb), "stmt_bb should NOT be sent (filtered)");
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic_bb);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
assert!(
handler.pending_initial_syncs.contains_key(&peer_id),
"Initial sync should be re-scheduled after affinity change"
);
notification_service.clear_sent_notifications();
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
}
let sent_after_bb = notification_service.get_sent_notifications();
let sent_hashes_bb: HashSet<_> = sent_after_bb
.iter()
.flat_map(|(_, notification)| {
match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
StatementMessage::Statements(stmts) => stmts,
_ => panic!("Expected StatementMessage::Statements"),
}
})
.map(|s| s.hash())
.collect();
assert!(
sent_hashes_bb.contains(&hash_bb),
"stmt_bb should now be sent after affinity changed to topic_bb"
);
assert!(
sent_hashes_bb.contains(&hash_no_topic),
"stmt_no_topic should be re-sent (known_statements cleared on affinity change)"
);
}
#[tokio::test]
async fn test_affinity_change_sends_previously_filtered_statements() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers_light();
let peer_id = PeerId::random();
let topic_aa: [u8; 32] = [0xAA; 32];
let topic_bb: [u8; 32] = [0xBB; 32];
let mut stmt_aa = Statement::new();
stmt_aa.set_plain_data(b"stmt_aa".to_vec());
stmt_aa.set_topic(0, topic_aa.into());
let hash_aa = stmt_aa.hash();
let mut stmt_bb = Statement::new();
stmt_bb.set_plain_data(b"stmt_bb".to_vec());
stmt_bb.set_topic(0, topic_bb.into());
let hash_bb = stmt_bb.hash();
statement_store.statements.lock().unwrap().insert(hash_aa, stmt_aa.clone());
statement_store.statements.lock().unwrap().insert(hash_bb, stmt_bb.clone());
statement_store.recent_statements.lock().unwrap().insert(hash_aa, stmt_aa);
statement_store.recent_statements.lock().unwrap().insert(hash_bb, stmt_bb);
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic_aa);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
}
let sent = notification_service.get_sent_notifications();
let sent_hashes: HashSet<_> = sent
.iter()
.flat_map(|(_, notification)| {
match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
StatementMessage::Statements(stmts) => stmts,
_ => panic!("Expected StatementMessage::Statements"),
}
})
.map(|s| s.hash())
.collect();
assert!(sent_hashes.contains(&hash_aa), "stmt_aa should be sent (matches affinity)");
assert!(
!sent_hashes.contains(&hash_bb),
"stmt_bb should NOT be sent (filtered by affinity)"
);
handler.propagate_statements().await;
let peer = handler.peers.get(&peer_id).unwrap();
assert!(
!peer.known_statements.contains(&hash_bb),
"stmt_bb should NOT be in known_statements (filtered by affinity)"
);
assert!(peer.known_statements.contains(&hash_aa), "stmt_aa should be in known_statements");
let mut filter = AffinityFilter::new(BLOOM_SEED, 0.01, 100);
filter.insert(&topic_aa);
filter.insert(&topic_bb);
let msg = StatementMessage::ExplicitTopicAffinity(filter);
let encoded = msg.encode();
notification_service.clear_sent_notifications();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: encoded.into(),
})
.await;
handler.process_pending_affinities();
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
}
let sent = notification_service.get_sent_notifications();
let sent_hashes: HashSet<_> = sent
.iter()
.flat_map(|(_, notification)| {
match StatementMessage::decode(&mut notification.as_slice()).unwrap() {
StatementMessage::Statements(stmts) => stmts,
_ => panic!("Expected StatementMessage::Statements"),
}
})
.map(|s| s.hash())
.collect();
assert!(
sent_hashes.contains(&hash_bb),
"stmt_bb should now be sent after affinity expanded to include topic_bb"
);
assert!(
sent_hashes.contains(&hash_aa),
"stmt_aa should be re-sent (known_statements cleared on affinity change)"
);
}
#[test]
fn test_encode_statement_refs_matches_derive_encoding() {
let mut stmt1 = Statement::new();
stmt1.set_plain_data(b"first".to_vec());
let mut stmt2 = Statement::new();
stmt2.set_plain_data(b"second".to_vec());
let refs: Vec<&Statement> = vec![&stmt1, &stmt2];
let hand_rolled = StatementMessage::encode_statement_refs(&refs);
let derive_encoded = StatementMessage::Statements(vec![stmt1, stmt2]).encode();
assert_eq!(
hand_rolled, derive_encoded,
"encode_statement_refs must produce identical bytes to derive Encode"
);
}
#[test]
fn test_encode_statement_refs_empty() {
let refs: Vec<&Statement> = vec![];
let hand_rolled = StatementMessage::encode_statement_refs(&refs);
let derive_encoded = StatementMessage::Statements(vec![]).encode();
assert_eq!(hand_rolled, derive_encoded);
}
#[test]
fn test_can_receive_all_combinations() {
let make_peer = |is_light: bool, version: PeerProtocolVersion, has_affinity: bool| {
let topic_affinity = has_affinity.then(|| AffinityFilter::new(BLOOM_SEED, 0.01, 10));
Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(10).unwrap()),
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).expect("nonzero"),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("nonzero"),
),
protocol_version: version,
topic_affinity,
is_light,
pending_topic_affinity: None,
}
};
assert!(make_peer(false, PeerProtocolVersion::V1, false).can_receive());
assert!(make_peer(false, PeerProtocolVersion::V2, false).can_receive());
assert!(make_peer(true, PeerProtocolVersion::V1, false).can_receive());
assert!(!make_peer(true, PeerProtocolVersion::V2, false).can_receive());
assert!(make_peer(true, PeerProtocolVersion::V2, true).can_receive());
assert!(make_peer(false, PeerProtocolVersion::V2, true).can_receive());
}
#[tokio::test]
async fn test_send_chunk_v1_vs_v2_encoding() {
let (mut handler, _statement_store, _network, notification_service) =
build_handler_no_peers();
let v1_peer = PeerId::random();
let v2_peer = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: v1_peer,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: v2_peer,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
let mut stmt = Statement::new();
stmt.set_plain_data(b"encoding test".to_vec());
notification_service.clear_sent_notifications();
handler.send_statement_chunk(&v1_peer, &[&stmt]).await;
let v1_sent = notification_service.get_sent_notifications();
assert_eq!(v1_sent.len(), 1);
let v1_bytes = &v1_sent[0].1;
let decoded_v1 = <Statements as Decode>::decode(&mut v1_bytes.as_slice())
.expect("V1 peer should receive raw Vec<Statement> encoding");
assert_eq!(decoded_v1.len(), 1);
notification_service.clear_sent_notifications();
handler.send_statement_chunk(&v2_peer, &[&stmt]).await;
let v2_sent = notification_service.get_sent_notifications();
assert_eq!(v2_sent.len(), 1);
let v2_bytes = &v2_sent[0].1;
let decoded_v2 = StatementMessage::decode(&mut v2_bytes.as_slice())
.expect("V2 peer should receive StatementMessage encoding");
match decoded_v2 {
StatementMessage::Statements(stmts) => assert_eq!(stmts.len(), 1),
_ => panic!("Expected StatementMessage::Statements for V2 peer"),
}
assert_ne!(v1_bytes, v2_bytes, "V1 and V2 encodings should differ");
}
#[tokio::test]
async fn test_schedule_initial_sync_replaces_existing() {
let (mut handler, statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
let mut stmt1 = Statement::new();
stmt1.set_plain_data(b"stmt1".to_vec());
let hash1 = stmt1.hash();
statement_store.statements.lock().unwrap().insert(hash1, stmt1);
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: Some(format!("/{STATEMENT_PROTOCOL_V1}").into()),
})
.await;
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(
handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
1,
"Peer should appear exactly once in the queue"
);
let mut stmt2 = Statement::new();
stmt2.set_plain_data(b"stmt2".to_vec());
let hash2 = stmt2.hash();
statement_store.statements.lock().unwrap().insert(hash2, stmt2);
handler.schedule_initial_sync_for_peer(peer_id);
assert_eq!(
handler.initial_sync_peer_queue.iter().filter(|p| **p == peer_id).count(),
1,
"Peer should NOT be duplicated in the queue after re-schedule"
);
let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
assert!(pending.hashes.contains(&hash1));
assert!(pending.hashes.contains(&hash2));
}
#[tokio::test]
async fn test_initial_sync_queued_during_major_sync_processed_after() {
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(2);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let sync = TestSync::new();
sync.major_syncing.store(true, Ordering::Relaxed);
let mut handler = StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: sync.clone(),
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
};
let mut stmt = Statement::new();
stmt.set_plain_data(b"during major sync".to_vec());
let hash = stmt.hash();
statement_store.statements.lock().unwrap().insert(hash, stmt);
let peer_id = PeerId::random();
handler.peers.insert(
peer_id,
Peer::new_for_testing(
LruHashSet::new(NonZeroUsize::new(100).unwrap()),
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.unwrap(),
),
);
handler.schedule_initial_sync_for_peer(peer_id);
assert!(
handler.pending_initial_syncs.contains_key(&peer_id),
"Initial sync should be queued even during major sync"
);
assert_eq!(handler.initial_sync_peer_queue.len(), 1);
handler.process_initial_sync_burst().await;
assert!(
handler.pending_initial_syncs.contains_key(&peer_id),
"Pending sync should remain untouched during major sync"
);
sync.major_syncing.store(false, Ordering::Relaxed);
handler.process_initial_sync_burst().await;
assert!(
handler.initial_sync_peer_queue.is_empty(),
"Peer should have been processed after major sync ended"
);
}
#[tokio::test]
async fn test_schedule_initial_sync_resends_all_matching() {
let (mut handler, statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
let mut stmt1 = Statement::new();
stmt1.set_plain_data(b"known".to_vec());
let hash1 = stmt1.hash();
let mut stmt2 = Statement::new();
stmt2.set_plain_data(b"unknown".to_vec());
let hash2 = stmt2.hash();
statement_store.statements.lock().unwrap().insert(hash1, stmt1);
statement_store.statements.lock().unwrap().insert(hash2, stmt2);
let mut known = LruHashSet::new(NonZeroUsize::new(100).unwrap());
known.insert(hash1);
handler.peers.insert(
peer_id,
Peer {
known_statements: known,
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND).unwrap(),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.unwrap(),
),
protocol_version: PeerProtocolVersion::V1,
topic_affinity: None,
is_light: false,
pending_topic_affinity: None,
},
);
handler.schedule_initial_sync_for_peer(peer_id);
let pending = handler.pending_initial_syncs.get(&peer_id).unwrap();
assert!(
pending.hashes.contains(&hash1),
"Previously known hash should be included after affinity change"
);
assert!(pending.hashes.contains(&hash2), "Unknown hash should be included in initial sync");
let peer_data = handler.peers.get(&peer_id).unwrap();
assert!(
!peer_data.known_statements.contains(&hash1),
"known_statements should be cleared after schedule_initial_sync_for_peer"
);
}
#[tokio::test]
async fn test_malformed_v2_message_does_not_panic() {
let (mut handler, _statement_store, _network, _notification_service) =
build_handler_no_peers();
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: vec![0xFF, 0xFE, 0xFD].into(),
})
.await;
let mut stmt = Statement::new();
stmt.set_plain_data(b"v1 encoded".to_vec());
let v1_encoded = vec![stmt].encode();
handler
.handle_notification_event(NotificationEvent::NotificationReceived {
peer: peer_id,
notification: v1_encoded.into(),
})
.await;
assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
}
#[test]
fn test_find_sendable_chunk_v2_overhead() {
let v1_max = max_statement_payload_size(V1_ENVELOPE_OVERHEAD);
let v2_max = max_statement_payload_size(V2_ENVELOPE_OVERHEAD);
assert!(
v2_max < v1_max,
"V2 payload capacity ({v2_max}) should be less than V1 ({v1_max})"
);
assert_eq!(v1_max - v2_max, 1, "V2 overhead is exactly 1 byte more than V1");
let stmts: Vec<Statement> = (0..1000)
.map(|i| {
let mut s = Statement::new();
s.set_plain_data(format!("stmt-{i}").into_bytes());
s
})
.collect();
let refs: Vec<&Statement> = stmts.iter().collect();
let v1_chunk = find_sendable_chunk(&refs, V1_ENVELOPE_OVERHEAD);
let v2_chunk = find_sendable_chunk(&refs, V2_ENVELOPE_OVERHEAD);
let v1_count = match v1_chunk {
ChunkResult::Send(n) => n,
_ => panic!("Expected Send for V1"),
};
let v2_count = match v2_chunk {
ChunkResult::Send(n) => n,
_ => panic!("Expected Send for V2"),
};
assert!(
v2_count <= v1_count,
"V2 ({v2_count}) should fit at most as many statements as V1 ({v1_count})"
);
}
#[tokio::test]
async fn test_full_node_v2_gets_initial_sync_immediately() {
let (mut handler, statement_store, _network, _notification_service) =
build_handler_no_peers();
let mut stmt = Statement::new();
stmt.set_plain_data(b"full node v2".to_vec());
let hash = stmt.hash();
statement_store.statements.lock().unwrap().insert(hash, stmt);
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: sc_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert!(
handler.pending_initial_syncs.contains_key(&peer_id),
"Full-node V2 peer should have initial sync scheduled immediately"
);
assert_eq!(handler.peers.get(&peer_id).unwrap().protocol_version, PeerProtocolVersion::V2);
assert!(!handler.peers.get(&peer_id).unwrap().is_light);
}
#[tokio::test]
async fn test_propagation_reaches_all_connected_peers() {
let (
mut handler,
statement_store,
_network,
notification_service,
_queue_receiver,
peer_ids,
) = build_handler(5);
let mut expected_hashes = Vec::new();
for i in 0..3u8 {
let mut statement = Statement::new();
statement.set_plain_data(vec![i; 100]);
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
expected_hashes.sort();
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
for peer_id in &peer_ids {
let mut received_hashes = get_peer_hashes(&sent, *peer_id);
received_hashes.sort();
assert_eq!(
received_hashes, expected_hashes,
"Peer {peer_id} should have received all 3 statements"
);
}
assert!(statement_store.recent_statements.lock().unwrap().is_empty());
}
#[tokio::test]
async fn test_known_statement_filtering_per_peer() {
let (
mut handler,
statement_store,
_network,
notification_service,
_queue_receiver,
peer_ids,
) = build_handler(3);
let peer_a = peer_ids[0];
let peer_b = peer_ids[1];
let peer_c = peer_ids[2];
let mut hashes = Vec::new();
for i in 0..5u8 {
let mut statement = Statement::new();
statement.set_plain_data(vec![i; 100]);
let hash = statement.hash();
hashes.push(hash);
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[0]);
handler.peers.get_mut(&peer_a).unwrap().known_statements.insert(hashes[1]);
handler.peers.get_mut(&peer_b).unwrap().known_statements.insert(hashes[2]);
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let peer_a_hashes = get_peer_hashes(&sent, peer_a);
let peer_b_hashes = get_peer_hashes(&sent, peer_b);
let peer_c_hashes = get_peer_hashes(&sent, peer_c);
assert_eq!(peer_a_hashes.len(), 3, "peer_a should get 3 statements");
assert!(!peer_a_hashes.contains(&hashes[0]), "peer_a already knows s1");
assert!(!peer_a_hashes.contains(&hashes[1]), "peer_a already knows s2");
assert!(peer_a_hashes.contains(&hashes[2]));
assert!(peer_a_hashes.contains(&hashes[3]));
assert!(peer_a_hashes.contains(&hashes[4]));
assert_eq!(peer_b_hashes.len(), 4, "peer_b should get 4 statements");
assert!(!peer_b_hashes.contains(&hashes[2]), "peer_b already knows s3");
assert!(peer_b_hashes.contains(&hashes[0]));
assert!(peer_b_hashes.contains(&hashes[1]));
assert!(peer_b_hashes.contains(&hashes[3]));
assert!(peer_b_hashes.contains(&hashes[4]));
let mut sorted_peer_c: Vec<_> = peer_c_hashes.into_iter().collect();
sorted_peer_c.sort();
let mut all_hashes = hashes.clone();
all_hashes.sort();
assert_eq!(sorted_peer_c, all_hashes, "peer_c should get all 5 statements");
}
#[test]
fn major_sync_defers_peers_and_handles_disconnect() {
let (sync, _flag) = TestSync::with_syncing(true);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(100);
let mut handler = StatementHandler {
protocol_name: "/statement/1".into(),
notification_service: Box::new(notification_service),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync,
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
};
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
handler.handle_sync_event(SyncEvent::PeerConnected(peer1));
handler.handle_sync_event(SyncEvent::PeerConnected(peer2));
handler.handle_sync_event(SyncEvent::PeerConnected(peer3));
assert!(network.get_added_reserved().is_empty());
assert!(network.get_removed_reserved().is_empty());
assert_eq!(handler.deferred_peers.len(), 3);
handler.handle_sync_event(SyncEvent::PeerDisconnected(peer1));
assert_eq!(handler.deferred_peers.len(), 2);
assert!(!handler.deferred_peers.contains(&peer1), "disconnected peer must leave buffer");
assert!(handler.deferred_peers.contains(&peer2));
assert!(handler.deferred_peers.contains(&peer3));
assert!(network.get_removed_reserved().is_empty(), "no remove call for buffered peer");
}
#[test]
fn deferred_peers_flushed_on_sync_end_without_remove() {
let (sync, flag) = TestSync::with_syncing(true);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(100);
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let mut deferred = HashSet::new();
deferred.insert(peer1);
deferred.insert(peer2);
let mut handler = StatementHandler {
protocol_name: "/statement/1".into(),
notification_service: Box::new(notification_service),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync,
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: deferred,
dropped_statements_during_sync: false,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
};
flag.store(false, std::sync::atomic::Ordering::Relaxed);
handler.drain_deferred_peers();
assert!(handler.deferred_peers.is_empty());
let added = network.get_added_reserved();
assert_eq!(added.len(), 1);
let added_addrs = &added[0];
let expected_addr1: sc_network::Multiaddr =
iter::once(multiaddr::Protocol::P2p(peer1.into())).collect();
let expected_addr2: sc_network::Multiaddr =
iter::once(multiaddr::Protocol::P2p(peer2.into())).collect();
assert!(added_addrs.contains(&expected_addr1), "peer1 must be in added set");
assert!(added_addrs.contains(&expected_addr2), "peer2 must be in added set");
assert!(network.get_removed_reserved().is_empty());
}
#[tokio::test]
async fn sync_recovery_schedules_remove_for_one_connected_peer() {
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let (sync, _flag) = TestSync::with_syncing(false);
let (queue_sender, _) = async_channel::bounded(2);
let statement_store = TestStatementStore::new();
let connected_peer = PeerId::random();
let mut peers = HashMap::new();
peers.insert(
connected_peer,
Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
protocol_version: PeerProtocolVersion::V1,
topic_affinity: None,
is_light: false,
pending_topic_affinity: None,
},
);
let mut handler = StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(notification_service),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync,
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers,
statement_store: Arc::new(statement_store),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: true,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(futures::future::pending()),
};
handler.start_sync_recovery();
{
let removed = network.removed_reserved.lock().unwrap();
assert_eq!(
removed.len(),
1,
"Expected exactly one remove_peers_from_reserved_set call"
);
assert!(removed[0].contains(&connected_peer));
}
assert_eq!(handler.sync_recovery_peer, Some(connected_peer));
handler.try_readd_sync_recovery_peer();
assert!(handler.sync_recovery_peer.is_none());
{
let added = network.added_reserved.lock().unwrap();
assert_eq!(added.len(), 1);
let expected_addr: multiaddr::Multiaddr =
iter::once(multiaddr::Protocol::P2p(connected_peer.into())).collect();
assert!(added[0].contains(&expected_addr));
}
{
let peer2 = PeerId::random();
handler.sync_recovery_peer = Some(peer2);
handler.start_sync_recovery();
assert_eq!(
handler.sync_recovery_peer,
Some(peer2),
"Re-entry guard: recovery peer must not change on second call"
);
assert_eq!(
network.removed_reserved.lock().unwrap().len(),
1,
"Re-entry guard: no extra remove call while recovery is in flight"
);
}
}
#[tokio::test]
async fn sync_recovery_gated_by_dropped_statements_flag() {
let make_peer = || Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(1024).unwrap()),
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
protocol_version: PeerProtocolVersion::V1,
topic_affinity: None,
is_light: false,
pending_topic_affinity: None,
};
let make_handler =
|network: TestNetwork, dropped: bool| -> StatementHandler<TestNetwork, TestSync> {
let (sync, _) = TestSync::with_syncing(false);
let (queue_sender, _) = async_channel::bounded(2);
let mut peers = HashMap::new();
peers.insert(PeerId::random(), make_peer());
StatementHandler {
protocol_name: format!("/{STATEMENT_PROTOCOL_V1}").into(),
notification_service: Box::new(TestNotificationService::new()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network,
sync,
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = sc_network_sync::types::SyncEvent> + Send>>)
.fuse(),
peers,
statement_store: Arc::new(TestStatementStore::new()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_affinities_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
deferred_peers: HashSet::new(),
dropped_statements_during_sync: dropped,
sync_recovery_peer: None,
sync_recovery_readd_timeout: Box::pin(pending().fuse()),
}
};
let net = TestNetwork::new();
let mut handler = make_handler(net.clone(), false);
handler.start_sync_recovery();
assert!(handler.sync_recovery_peer.is_none());
assert!(net.get_removed_reserved().is_empty());
let net2 = TestNetwork::new();
let mut handler2 = make_handler(net2.clone(), true);
handler2.start_sync_recovery();
assert!(handler2.sync_recovery_peer.is_some());
assert_eq!(net2.get_removed_reserved().len(), 1);
}
}