use self::config::*;
use codec::{Compact, Decode, Encode, MaxEncodedLen};
#[cfg(any(test, feature = "test-helpers"))]
use futures::future::pending;
use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
use governor::{
clock::DefaultClock,
state::{InMemoryState, NotKeyed},
Quota, RateLimiter,
};
use soil_prometheus::{
exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
Registry, U64,
};
use soil_network::sync::{SyncEvent, SyncEventStream};
use soil_network::types::PeerId;
use soil_network::{
config::{NonReservedPeerMode, SetConfig},
error, multiaddr,
peer_store::PeerStoreProvider,
service::{
traits::{NotificationEvent, NotificationService, ValidationResult},
NotificationMetrics,
},
types::ProtocolName,
utils::{interval, LruHashSet},
NetworkBackend, NetworkEventStream, NetworkPeers,
};
use soil_statement_store::{
FilterDecision, Hash, Statement, StatementSource, StatementStore, SubmitResult,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
iter,
num::{NonZeroU32, NonZeroUsize},
pin::Pin,
sync::Arc,
time::Instant,
};
use subsoil::runtime::traits::Block as BlockT;
use tokio::time::timeout;
pub mod config;
pub type Statements = Vec<Statement>;
pub type StatementImportFuture = oneshot::Receiver<SubmitResult>;
mod rep {
use soil_network::ReputationChange as Rep;
pub const ANY_STATEMENT: Rep = Rep::new(-(1 << 4), "Any statement");
pub const ANY_STATEMENT_REFUND: Rep = Rep::new(1 << 4, "Any statement (refund)");
pub const GOOD_STATEMENT: Rep = Rep::new(1 << 8, "Good statement");
pub const INVALID_STATEMENT: Rep = Rep::new(-(1 << 12), "Invalid statement");
pub const DUPLICATE_STATEMENT: Rep = Rep::new(-(1 << 7), "Duplicate statement");
pub const STATEMENT_FLOODING: Rep = Rep::new_fatal("Statement flooding");
}
const LOG_TARGET: &str = "statement-gossip";
const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INITIAL_SYNC_BURST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
struct Metrics {
propagated_statements: Counter<U64>,
known_statements_received: Counter<U64>,
skipped_oversized_statements: Counter<U64>,
propagated_statements_chunks: Histogram,
pending_statements: Gauge<U64>,
ignored_statements: Counter<U64>,
peers_connected: Gauge<U64>,
statements_received: Counter<U64>,
bytes_sent_total: Counter<U64>,
bytes_received_total: Counter<U64>,
sent_latency_seconds: Histogram,
initial_sync_statements_sent: Counter<U64>,
initial_sync_bursts_total: Counter<U64>,
initial_sync_peers_active: Gauge<U64>,
initial_sync_duration_seconds: Histogram,
statement_flooding_detected: Counter<U64>,
}
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
propagated_statements: register(
Counter::new(
"substrate_sync_propagated_statements",
"Number of statements propagated to at least one peer",
)?,
r,
)?,
known_statements_received: register(
Counter::new(
"substrate_sync_known_statement_received",
"Number of statements received via gossiping that were already in the statement store",
)?,
r,
)?,
skipped_oversized_statements: register(
Counter::new(
"substrate_sync_skipped_oversized_statements",
"Number of oversized statements that were skipped to be gossiped",
)?,
r,
)?,
propagated_statements_chunks: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_propagated_statements_chunks",
"Distribution of chunk sizes when propagating statements",
)
.buckets(exponential_buckets(1.0, 2.0, 14)?),
)?,
r,
)?,
pending_statements: register(
Gauge::new(
"substrate_sync_pending_statement_validations",
"Number of pending statement validations",
)?,
r,
)?,
ignored_statements: register(
Counter::new(
"substrate_sync_ignored_statements",
"Number of statements ignored due to exceeding MAX_PENDING_STATEMENTS limit",
)?,
r,
)?,
peers_connected: register(
Gauge::new(
"substrate_sync_statement_peers_connected",
"Number of peers connected using the statement protocol",
)?,
r,
)?,
statements_received: register(
Counter::new(
"substrate_sync_statements_received",
"Total number of statements received from peers",
)?,
r,
)?,
bytes_sent_total: register(
Counter::new(
"substrate_sync_statement_bytes_sent_total",
"Total bytes sent for statement protocol messages",
)?,
r,
)?,
bytes_received_total: register(
Counter::new(
"substrate_sync_statement_bytes_received_total",
"Total bytes received for statement protocol messages",
)?,
r,
)?,
sent_latency_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_statement_sent_latency_seconds",
"Time to send statement messages to peers",
)
.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
)?,
r,
)?,
initial_sync_statements_sent: register(
Counter::new(
"substrate_sync_initial_sync_statements_sent",
"Total statements sent during initial sync bursts to newly connected peers",
)?,
r,
)?,
initial_sync_bursts_total: register(
Counter::new(
"substrate_sync_initial_sync_bursts_total",
"Total number of initial sync burst rounds processed",
)?,
r,
)?,
initial_sync_peers_active: register(
Gauge::new(
"substrate_sync_initial_sync_peers_active",
"Number of peers currently being synced via initial sync",
)?,
r,
)?,
initial_sync_duration_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_initial_sync_duration_seconds",
"Per-peer total duration of initial sync from start to completion",
)
.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
)?,
r,
)?,
statement_flooding_detected: register(
Counter::new(
"substrate_sync_statement_flooding_detected",
"Number of peers disconnected for exceeding statement rate limits",
)?,
r,
)?,
})
}
}
pub struct StatementHandlerPrototype {
protocol_name: ProtocolName,
notification_service: Box<dyn NotificationService>,
}
impl StatementHandlerPrototype {
pub fn new<
Hash: AsRef<[u8]>,
Block: BlockT,
Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
genesis_hash: Hash,
fork_id: Option<&str>,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, Net::NotificationProtocolConfig) {
let genesis_hash = genesis_hash.as_ref();
let protocol_name = if let Some(fork_id) = fork_id {
format!("/{}/{}/statement/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
} else {
format!("/{}/statement/1", array_bytes::bytes2hex("", genesis_hash))
};
let (config, notification_service) = Net::notification_config(
protocol_name.clone().into(),
Vec::new(),
MAX_STATEMENT_NOTIFICATION_SIZE,
None,
SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
metrics,
peer_store_handle,
);
(Self { protocol_name: protocol_name.into(), notification_service }, config)
}
pub fn build<
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + soil_client::consensus::SyncOracle,
>(
self,
network: N,
sync: S,
statement_store: Arc<dyn StatementStore>,
metrics_registry: Option<&Registry>,
executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send,
mut num_submission_workers: usize,
statements_per_second: u32,
) -> error::Result<StatementHandler<N, S>> {
let sync_event_stream = sync.event_stream("statement-handler-sync");
let (queue_sender, queue_receiver) = async_channel::bounded(MAX_PENDING_STATEMENTS);
if num_submission_workers == 0 {
log::warn!(
target: LOG_TARGET,
"num_submission_workers is 0, defaulting to 1"
);
num_submission_workers = 1;
}
let statements_per_second = match NonZeroU32::new(statements_per_second) {
Some(rate) => rate,
None => {
log::warn!(
target: LOG_TARGET,
"statements_per_second is 0, defaulting to {}",
DEFAULT_STATEMENTS_PER_SECOND
);
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero")
},
};
let metrics =
if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };
for _ in 0..num_submission_workers {
let store = statement_store.clone();
let mut queue_receiver = queue_receiver.clone();
executor(
async move {
loop {
let task: Option<(Statement, oneshot::Sender<SubmitResult>)> =
queue_receiver.next().await;
match task {
None => return,
Some((statement, completion)) => {
let result = store.submit(statement, StatementSource::Network);
if completion.send(result).is_err() {
log::debug!(
target: LOG_TARGET,
"Error sending validation completion"
);
}
},
}
}
}
.boxed(),
);
}
let handler = StatementHandler {
protocol_name: self.protocol_name,
notification_service: self.notification_service,
propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network,
sync,
sync_event_stream: sync_event_stream.fuse(),
peers: HashMap::new(),
statement_store,
queue_sender,
statements_per_second,
metrics,
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
};
Ok(handler)
}
}
pub struct StatementHandler<
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + soil_client::consensus::SyncOracle,
> {
protocol_name: ProtocolName,
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
pending_statements:
FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>,
pending_statements_peers: HashMap<Hash, HashSet<PeerId>>,
network: N,
sync: S,
sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
notification_service: Box<dyn NotificationService>,
peers: HashMap<PeerId, Peer>,
statement_store: Arc<dyn StatementStore>,
queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
statements_per_second: NonZeroU32,
metrics: Option<Metrics>,
initial_sync_timeout: Pin<Box<dyn FusedFuture<Output = ()> + Send>>,
pending_initial_syncs: HashMap<PeerId, PendingInitialSync>,
initial_sync_peer_queue: VecDeque<PeerId>,
}
#[derive(Debug)]
struct PeerRateLimiter {
limiter: RateLimiter<NotKeyed, InMemoryState, DefaultClock>,
}
impl PeerRateLimiter {
fn new(statements_per_second: NonZeroU32, burst: NonZeroU32) -> Self {
let quota = Quota::per_second(statements_per_second).allow_burst(burst);
Self { limiter: RateLimiter::direct(quota) }
}
fn is_flooding(&self, count: usize) -> bool {
if count > u32::MAX as usize {
return true;
}
let Some(n) = NonZeroU32::new(count as u32) else {
return false;
};
!matches!(self.limiter.check_n(n), Ok(Ok(())))
}
}
#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
#[derive(Debug)]
pub struct Peer {
known_statements: LruHashSet<Hash>,
rate_limiter: PeerRateLimiter,
}
struct PendingInitialSync {
hashes: Vec<Hash>,
started_at: Instant,
}
enum ChunkResult {
Send(usize),
SkipOversized,
}
enum SendChunkResult {
Sent(usize),
Skipped,
Empty,
Failed,
}
fn max_statement_payload_size() -> usize {
MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len()
}
fn find_sendable_chunk(statements: &[&Statement]) -> ChunkResult {
if statements.is_empty() {
return ChunkResult::Send(0);
}
let max_size = max_statement_payload_size();
let mut accumulated_size = 0;
let mut count = 0usize;
for stmt in &statements[0..] {
let stmt_size = stmt.encoded_size();
let new_count = count + 1;
let new_total = accumulated_size + stmt_size;
if new_total > max_size {
break;
}
accumulated_size += stmt_size;
count = new_count;
}
if count == 0 {
ChunkResult::SkipOversized
} else {
ChunkResult::Send(count)
}
}
impl Peer {
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_for_testing(
known_statements: LruHashSet<Hash>,
statements_per_second: NonZeroU32,
burst: NonZeroU32,
) -> Self {
Self { known_statements, rate_limiter: PeerRateLimiter::new(statements_per_second, burst) }
}
}
impl<N, S> StatementHandler<N, S>
where
N: NetworkPeers + NetworkEventStream,
S: SyncEventStream + soil_client::consensus::SyncOracle,
{
#[cfg(any(test, feature = "test-helpers"))]
pub fn new_for_testing(
protocol_name: ProtocolName,
notification_service: Box<dyn NotificationService>,
propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
network: N,
sync: S,
sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
peers: HashMap<PeerId, Peer>,
statement_store: Arc<dyn StatementStore>,
queue_sender: async_channel::Sender<(Statement, oneshot::Sender<SubmitResult>)>,
statements_per_second: NonZeroU32,
) -> Self {
Self {
protocol_name,
notification_service,
propagate_timeout,
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network,
sync,
sync_event_stream,
peers,
statement_store,
queue_sender,
statements_per_second,
metrics: None,
initial_sync_timeout: Box::pin(pending().fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
}
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn pending_statements_mut(
&mut self,
) -> &mut FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, Option<SubmitResult>)> + Send>>>
{
&mut self.pending_statements
}
pub async fn run(mut self) {
loop {
futures::select_biased! {
_ = self.propagate_timeout.next() => {
self.propagate_statements().await;
self.metrics.as_ref().map(|metrics| {
metrics.pending_statements.set(self.pending_statements.len() as u64);
});
},
(hash, result) = self.pending_statements.select_next_some() => {
if let Some(peers) = self.pending_statements_peers.remove(&hash) {
if let Some(result) = result {
peers.into_iter().for_each(|p| self.on_handle_statement_import(p, &result));
}
} else {
log::warn!(target: LOG_TARGET, "Inconsistent state, no peers for pending statement!");
}
},
sync_event = self.sync_event_stream.next() => {
if let Some(sync_event) = sync_event {
self.handle_sync_event(sync_event);
} else {
return;
}
}
event = self.notification_service.next_event().fuse() => {
if let Some(event) = event {
self.handle_notification_event(event).await
} else {
return
}
}
_ = &mut self.initial_sync_timeout => {
self.process_initial_sync_burst().await;
self.initial_sync_timeout =
Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse());
},
}
}
}
async fn send_statement_chunk(
&mut self,
peer: &PeerId,
statements: &[&Statement],
) -> SendChunkResult {
match find_sendable_chunk(statements) {
ChunkResult::Send(0) => SendChunkResult::Empty,
ChunkResult::Send(chunk_end) => {
let chunk = &statements[..chunk_end];
let encoded = chunk.encode();
let bytes_to_send = encoded.len() as u64;
let sent_latency_timer =
self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
let send_result = timeout(
SEND_TIMEOUT,
self.notification_service.send_async_notification(peer, encoded),
)
.await;
drop(sent_latency_timer);
if let Err(e) = send_result {
log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
return SendChunkResult::Failed;
}
log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
self.metrics.as_ref().map(|metrics| {
metrics.propagated_statements.inc_by(chunk.len() as u64);
metrics.bytes_sent_total.inc_by(bytes_to_send);
metrics.propagated_statements_chunks.observe(chunk.len() as f64);
});
SendChunkResult::Sent(chunk_end)
},
ChunkResult::SkipOversized => {
log::warn!(target: LOG_TARGET, "Statement too large, skipping");
self.metrics.as_ref().map(|metrics| {
metrics.skipped_oversized_statements.inc();
});
SendChunkResult::Skipped
},
}
}
fn handle_sync_event(&mut self, event: SyncEvent) {
match event {
SyncEvent::PeerConnected(remote) => {
let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
.collect::<multiaddr::Multiaddr>();
let result = self.network.add_peers_to_reserved_set(
self.protocol_name.clone(),
iter::once(addr).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
}
},
SyncEvent::PeerDisconnected(remote) => {
let result = self.network.remove_peers_from_reserved_set(
self.protocol_name.clone(),
iter::once(remote).collect(),
);
if let Err(err) = result {
log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}");
}
},
}
}
async fn handle_notification_event(&mut self, event: NotificationEvent) {
match event {
NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
let result = self
.network
.peer_role(peer, handshake)
.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
let _ = result_tx.send(result);
},
NotificationEvent::NotificationStreamOpened { peer, .. } => {
let _was_in = self.peers.insert(
peer,
Peer {
known_statements: LruHashSet::new(
NonZeroUsize::new(MAX_KNOWN_STATEMENTS).expect("Constant is nonzero"),
),
rate_limiter: PeerRateLimiter::new(
self.statements_per_second,
NonZeroU32::new(
self.statements_per_second.get()
* config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
},
);
debug_assert!(_was_in.is_none());
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});
if !self.sync.is_major_syncing() {
let hashes = self.statement_store.statement_hashes();
if !hashes.is_empty() {
self.pending_initial_syncs.insert(
peer,
PendingInitialSync { hashes, started_at: Instant::now() },
);
self.initial_sync_peer_queue.push_back(peer);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.inc();
});
}
}
},
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(pending.started_at.elapsed().as_secs_f64());
});
}
self.initial_sync_peer_queue.retain(|p| *p != peer);
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});
},
NotificationEvent::NotificationReceived { peer, notification } => {
let bytes_received = notification.len() as u64;
self.metrics.as_ref().map(|metrics| {
metrics.bytes_received_total.inc_by(bytes_received);
});
if self.sync.is_major_syncing() {
log::trace!(
target: LOG_TARGET,
"{peer}: Ignoring statements while major syncing or offline"
);
return;
}
if let Ok(statements) = <Statements as Decode>::decode(&mut notification.as_ref()) {
self.on_statements(peer, statements);
} else {
log::debug!(target: LOG_TARGET, "Failed to decode statement list from {peer}");
}
},
}
}
#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);
self.metrics.as_ref().map(|metrics| {
metrics.statements_received.inc_by(statements.len() as u64);
});
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if peer.rate_limiter.is_flooding(statements.len()) {
log::warn!(
target: LOG_TARGET,
"Peer {} exceeded statement rate limit ({} statements/sec). Disconnecting.",
who,
self.statements_per_second
);
self.network.report_peer(who, rep::STATEMENT_FLOODING);
self.network.disconnect_peer(who, self.protocol_name.clone());
if let Some(ref metrics) = self.metrics {
metrics.statement_flooding_detected.inc();
}
self.peers.remove(&who);
self.pending_initial_syncs.remove(&who);
self.initial_sync_peer_queue.retain(|p| *p != who);
return;
}
let mut statements_left = statements.len() as u64;
for s in statements {
if self.pending_statements.len() > MAX_PENDING_STATEMENTS {
log::debug!(
target: LOG_TARGET,
"Ignoring {} statements that exceed `MAX_PENDING_STATEMENTS`({}) limit",
statements_left,
MAX_PENDING_STATEMENTS,
);
self.metrics.as_ref().map(|metrics| {
metrics.ignored_statements.inc_by(statements_left);
});
break;
}
let hash = s.hash();
peer.known_statements.insert(hash);
if self.statement_store.has_statement(&hash) {
self.metrics.as_ref().map(|metrics| {
metrics.known_statements_received.inc();
});
if let Some(peers) = self.pending_statements_peers.get(&hash) {
if peers.contains(&who) {
log::trace!(
target: LOG_TARGET,
"Already received the statement from the same peer {who}.",
);
self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
}
}
continue;
}
self.network.report_peer(who, rep::ANY_STATEMENT);
match self.pending_statements_peers.entry(hash) {
Entry::Vacant(entry) => {
let (completion_sender, completion_receiver) = oneshot::channel();
match self.queue_sender.try_send((s, completion_sender)) {
Ok(()) => {
self.pending_statements.push(
async move {
let res = completion_receiver.await;
(hash, res.ok())
}
.boxed(),
);
entry.insert(HashSet::from_iter([who]));
},
Err(async_channel::TrySendError::Full(_)) => {
log::debug!(
target: LOG_TARGET,
"Dropped statement because validation channel is full",
);
},
Err(async_channel::TrySendError::Closed(_)) => {
log::trace!(
target: LOG_TARGET,
"Dropped statement because validation channel is closed",
);
},
}
},
Entry::Occupied(mut entry) => {
if !entry.get_mut().insert(who) {
self.network.report_peer(who, rep::DUPLICATE_STATEMENT);
}
},
}
statements_left -= 1;
}
}
}
fn on_handle_statement_import(&mut self, who: PeerId, import: &SubmitResult) {
match import {
SubmitResult::New => self.network.report_peer(who, rep::GOOD_STATEMENT),
SubmitResult::Known => self.network.report_peer(who, rep::ANY_STATEMENT_REFUND),
SubmitResult::KnownExpired => {},
SubmitResult::Rejected(_) => {},
SubmitResult::Invalid(_) => self.network.report_peer(who, rep::INVALID_STATEMENT),
SubmitResult::InternalError(_) => {},
}
}
pub async fn propagate_statement(&mut self, hash: &Hash) {
if self.sync.is_major_syncing() {
return;
}
log::debug!(target: LOG_TARGET, "Propagating statement [{:?}]", hash);
if let Ok(Some(statement)) = self.statement_store.statement(hash) {
self.do_propagate_statements(&[(*hash, statement)]).await;
}
}
async fn send_statements_to_peer(&mut self, who: &PeerId, statements: &[(Hash, Statement)]) {
let Some(peer) = self.peers.get_mut(who) else {
return;
};
let to_send: Vec<_> = statements
.iter()
.filter_map(|(hash, stmt)| peer.known_statements.insert(*hash).then(|| stmt))
.collect();
log::trace!(target: LOG_TARGET, "We have {} statements that the peer doesn't know about", to_send.len());
if to_send.is_empty() {
return;
}
self.send_statements_in_chunks(who, &to_send).await;
}
async fn send_statements_in_chunks(&mut self, who: &PeerId, statements: &[&Statement]) {
let mut offset = 0;
while offset < statements.len() {
match self.send_statement_chunk(who, &statements[offset..]).await {
SendChunkResult::Sent(chunk_end) => {
offset += chunk_end;
},
SendChunkResult::Skipped => {
offset += 1;
},
SendChunkResult::Empty | SendChunkResult::Failed => return,
}
}
}
async fn do_propagate_statements(&mut self, statements: &[(Hash, Statement)]) {
log::debug!(target: LOG_TARGET, "Propagating {} statements for {} peers", statements.len(), self.peers.len());
let peers: Vec<_> = self.peers.keys().copied().collect();
for who in peers {
log::trace!(target: LOG_TARGET, "Start propagating statements for {}", who);
self.send_statements_to_peer(&who, statements).await;
}
log::trace!(target: LOG_TARGET, "Statements propagated to all peers");
}
async fn propagate_statements(&mut self) {
if self.sync.is_major_syncing() {
return;
}
let Ok(statements) = self.statement_store.take_recent_statements() else { return };
if !statements.is_empty() {
self.do_propagate_statements(&statements).await;
}
}
fn record_initial_sync_completion(&self, started_at: Instant) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(started_at.elapsed().as_secs_f64());
});
}
async fn process_initial_sync_burst(&mut self) {
if self.sync.is_major_syncing() {
return;
}
let Some(peer_id) = self.initial_sync_peer_queue.pop_front() else {
return;
};
let Entry::Occupied(mut entry) = self.pending_initial_syncs.entry(peer_id) else {
return;
};
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_bursts_total.inc();
});
if entry.get().hashes.is_empty() {
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
}
let max_size = max_statement_payload_size();
let mut accumulated_size = 0;
let (statements, processed) = match self.statement_store.statements_by_hashes(
&entry.get().hashes,
&mut |_hash, encoded, _stmt| {
if accumulated_size > 0 && accumulated_size + encoded.len() > max_size {
return FilterDecision::Abort;
}
accumulated_size += encoded.len();
FilterDecision::Take
},
) {
Ok(r) => r,
Err(e) => {
log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
},
};
entry.get_mut().hashes.drain(..processed);
let has_more = !entry.get().hashes.is_empty();
drop(entry);
let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
match self.send_statement_chunk(&peer_id, &to_send).await {
SendChunkResult::Failed => {
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
return;
},
SendChunkResult::Sent(sent) => {
debug_assert_eq!(to_send.len(), sent);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_statements_sent.inc_by(sent as u64);
});
if let Some(peer) = self.peers.get_mut(&peer_id) {
for (hash, _) in &statements {
peer.known_statements.insert(*hash);
}
}
},
SendChunkResult::Empty | SendChunkResult::Skipped => {},
}
if has_more {
self.initial_sync_peer_queue.push_back(peer_id);
} else {
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
#[derive(Clone)]
struct TestNetwork {
reported_peers: Arc<Mutex<Vec<(PeerId, soil_network::ReputationChange)>>>,
disconnected_peers: Arc<Mutex<Vec<PeerId>>>,
}
impl TestNetwork {
fn new() -> Self {
Self {
reported_peers: Arc::new(Mutex::new(Vec::new())),
disconnected_peers: Arc::new(Mutex::new(Vec::new())),
}
}
fn get_reports(&self) -> Vec<(PeerId, soil_network::ReputationChange)> {
self.reported_peers.lock().unwrap().clone()
}
fn get_disconnected_peers(&self) -> Vec<PeerId> {
self.disconnected_peers.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
impl NetworkPeers for TestNetwork {
fn set_authorized_peers(&self, _: std::collections::HashSet<PeerId>) {
unimplemented!()
}
fn set_authorized_only(&self, _: bool) {
unimplemented!()
}
fn add_known_address(&self, _: PeerId, _: soil_network::Multiaddr) {
unimplemented!()
}
fn report_peer(&self, peer_id: PeerId, cost_benefit: soil_network::ReputationChange) {
self.reported_peers.lock().unwrap().push((peer_id, cost_benefit));
}
fn peer_reputation(&self, _: &PeerId) -> i32 {
unimplemented!()
}
fn disconnect_peer(&self, peer: PeerId, _: soil_network::ProtocolName) {
self.disconnected_peers.lock().unwrap().push(peer);
}
fn accept_unreserved_peers(&self) {
unimplemented!()
}
fn deny_unreserved_peers(&self) {
unimplemented!()
}
fn add_reserved_peer(
&self,
_: soil_network::config::MultiaddrWithPeerId,
) -> Result<(), String> {
unimplemented!()
}
fn remove_reserved_peer(&self, _: PeerId) {
unimplemented!()
}
fn set_reserved_peers(
&self,
_: soil_network::ProtocolName,
_: std::collections::HashSet<soil_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}
fn add_peers_to_reserved_set(
&self,
_: soil_network::ProtocolName,
_: std::collections::HashSet<soil_network::Multiaddr>,
) -> Result<(), String> {
unimplemented!()
}
fn remove_peers_from_reserved_set(
&self,
_: soil_network::ProtocolName,
_: Vec<PeerId>,
) -> Result<(), String> {
unimplemented!()
}
fn sync_num_connected(&self) -> usize {
unimplemented!()
}
fn peer_role(&self, _: PeerId, _: Vec<u8>) -> Option<soil_network::ObservedRole> {
unimplemented!()
}
async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
unimplemented!();
}
}
struct TestSync {}
impl SyncEventStream for TestSync {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>> {
unimplemented!()
}
}
impl soil_client::consensus::SyncOracle for TestSync {
fn is_major_syncing(&self) -> bool {
false
}
fn is_offline(&self) -> bool {
unimplemented!()
}
}
impl NetworkEventStream for TestNetwork {
fn event_stream(
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = soil_network::Event> + Send>> {
unimplemented!()
}
}
#[derive(Debug, Clone)]
struct TestNotificationService {
sent_notifications: Arc<Mutex<Vec<(PeerId, Vec<u8>)>>>,
}
impl TestNotificationService {
fn new() -> Self {
Self { sent_notifications: Arc::new(Mutex::new(Vec::new())) }
}
fn get_sent_notifications(&self) -> Vec<(PeerId, Vec<u8>)> {
self.sent_notifications.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
impl NotificationService for TestNotificationService {
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
unimplemented!()
}
fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>) {
self.sent_notifications.lock().unwrap().push((*peer, notification));
}
async fn send_async_notification(
&mut self,
peer: &PeerId,
notification: Vec<u8>,
) -> Result<(), soil_network::error::Error> {
self.sent_notifications.lock().unwrap().push((*peer, notification));
Ok(())
}
async fn set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
fn try_set_handshake(&mut self, _handshake: Vec<u8>) -> Result<(), ()> {
unimplemented!()
}
async fn next_event(&mut self) -> Option<soil_network::service::traits::NotificationEvent> {
None
}
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
unimplemented!()
}
fn protocol(&self) -> &soil_network::types::ProtocolName {
unimplemented!()
}
fn message_sink(
&self,
_peer: &PeerId,
) -> Option<Box<dyn soil_network::service::traits::MessageSink>> {
unimplemented!()
}
}
#[derive(Clone)]
struct TestStatementStore {
statements:
Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
recent_statements:
Arc<Mutex<HashMap<soil_statement_store::Hash, soil_statement_store::Statement>>>,
}
impl TestStatementStore {
fn new() -> Self {
Self { statements: Default::default(), recent_statements: Default::default() }
}
}
impl StatementStore for TestStatementStore {
fn statements(
&self,
) -> soil_statement_store::Result<
Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
> {
Ok(self.statements.lock().unwrap().iter().map(|(h, s)| (*h, s.clone())).collect())
}
fn take_recent_statements(
&self,
) -> soil_statement_store::Result<
Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
> {
Ok(self.recent_statements.lock().unwrap().drain().collect())
}
fn statement(
&self,
_hash: &soil_statement_store::Hash,
) -> soil_statement_store::Result<Option<soil_statement_store::Statement>> {
unimplemented!()
}
fn has_statement(&self, hash: &soil_statement_store::Hash) -> bool {
self.statements.lock().unwrap().contains_key(hash)
}
fn statement_hashes(&self) -> Vec<soil_statement_store::Hash> {
self.statements.lock().unwrap().keys().cloned().collect()
}
fn statements_by_hashes(
&self,
hashes: &[soil_statement_store::Hash],
filter: &mut dyn FnMut(
&soil_statement_store::Hash,
&[u8],
&soil_statement_store::Statement,
) -> FilterDecision,
) -> soil_statement_store::Result<(
Vec<(soil_statement_store::Hash, soil_statement_store::Statement)>,
usize,
)> {
let statements = self.statements.lock().unwrap();
let mut result = Vec::new();
let mut processed = 0;
for hash in hashes {
let Some(stmt) = statements.get(hash) else {
processed += 1;
continue;
};
let encoded = stmt.encode();
match filter(hash, &encoded, stmt) {
FilterDecision::Skip => {
processed += 1;
},
FilterDecision::Take => {
processed += 1;
result.push((*hash, stmt.clone()));
},
FilterDecision::Abort => break,
}
}
Ok((result, processed))
}
fn broadcasts(
&self,
_match_all_topics: &[soil_statement_store::Topic],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted(
&self,
_match_all_topics: &[soil_statement_store::Topic],
_dest: [u8; 32],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_clear(
&self,
_match_all_topics: &[soil_statement_store::Topic],
_dest: [u8; 32],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn broadcasts_stmt(
&self,
_match_all_topics: &[soil_statement_store::Topic],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_stmt(
&self,
_match_all_topics: &[soil_statement_store::Topic],
_dest: [u8; 32],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn posted_clear_stmt(
&self,
_match_all_topics: &[soil_statement_store::Topic],
_dest: [u8; 32],
) -> soil_statement_store::Result<Vec<Vec<u8>>> {
unimplemented!()
}
fn submit(
&self,
_statement: soil_statement_store::Statement,
_source: soil_statement_store::StatementSource,
) -> soil_statement_store::SubmitResult {
unimplemented!()
}
fn remove(&self, _hash: &soil_statement_store::Hash) -> soil_statement_store::Result<()> {
unimplemented!()
}
fn remove_by(&self, _who: [u8; 32]) -> soil_statement_store::Result<()> {
unimplemented!()
}
}
fn build_handler() -> (
StatementHandler<TestNetwork, TestSync>,
TestStatementStore,
TestNetwork,
TestNotificationService,
async_channel::Receiver<(Statement, oneshot::Sender<SubmitResult>)>,
) {
let statement_store = TestStatementStore::new();
let (queue_sender, queue_receiver) = async_channel::bounded(2);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let peer_id = PeerId::random();
let mut peers = HashMap::new();
peers.insert(
peer_id,
Peer {
known_statements: LruHashSet::new(NonZeroUsize::new(100).unwrap()),
rate_limiter: PeerRateLimiter::new(
NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
NonZeroU32::new(
DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT,
)
.expect("burst capacity is nonzero"),
),
},
);
let handler = StatementHandler {
protocol_name: "/statement/1".into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: TestSync {},
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
.fuse(),
peers,
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
};
(handler, statement_store, network, notification_service, queue_receiver)
}
#[tokio::test]
async fn test_skips_processing_statements_that_already_in_store() {
let (mut handler, statement_store, _network, _notification_service, queue_receiver) =
build_handler();
let mut statement1 = Statement::new();
statement1.set_plain_data(b"statement1".to_vec());
let hash1 = statement1.hash();
statement_store.statements.lock().unwrap().insert(hash1, statement1.clone());
let mut statement2 = Statement::new();
statement2.set_plain_data(b"statement2".to_vec());
let hash2 = statement2.hash();
let peer_id = *handler.peers.keys().next().unwrap();
handler.on_statements(peer_id, vec![statement1, statement2]);
let to_submit = queue_receiver.try_recv();
assert_eq!(to_submit.unwrap().0.hash(), hash2, "Expected only statement2 to be queued");
let no_more = queue_receiver.try_recv();
assert!(no_more.is_err(), "Expected only one statement to be queued");
}
#[tokio::test]
async fn test_reports_for_duplicate_statements() {
let (mut handler, statement_store, network, _notification_service, queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let mut statement1 = Statement::new();
statement1.set_plain_data(b"statement1".to_vec());
handler.on_statements(peer_id, vec![statement1.clone()]);
{
let (s, _) = queue_receiver.try_recv().unwrap();
let _ = statement_store.statements.lock().unwrap().insert(s.hash(), s);
handler.network.report_peer(peer_id, rep::ANY_STATEMENT_REFUND);
}
handler.on_statements(peer_id, vec![statement1]);
let reports = network.get_reports();
assert_eq!(
reports,
vec![
(peer_id, rep::ANY_STATEMENT), (peer_id, rep::ANY_STATEMENT_REFUND), (peer_id, rep::DUPLICATE_STATEMENT) ],
"Expected ANY_STATEMENT, ANY_STATEMENT_REFUND, DUPLICATE_STATEMENT reputation change, but got: {:?}",
reports
);
}
#[tokio::test]
async fn test_splits_large_batches_into_smaller_chunks() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
build_handler();
let num_statements = 30;
let statement_size = 100 * 1024; for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = i as u8;
statement.set_plain_data(data);
let hash = statement.hash();
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let mut total_statements_sent = 0;
assert!(
sent.len() == 3,
"Expected batch to be split into 3 chunks, but got {} chunks",
sent.len()
);
for (_peer, notification) in sent.iter() {
assert!(
notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
"Notification size {} exceeds limit {}",
notification.len(),
MAX_STATEMENT_NOTIFICATION_SIZE
);
if let Ok(stmts) = <Statements as Decode>::decode(&mut notification.as_slice()) {
total_statements_sent += stmts.len();
}
}
assert_eq!(
total_statements_sent, num_statements,
"Expected all {} statements to be sent, but only {} were sent",
num_statements, total_statements_sent
);
}
#[tokio::test]
async fn test_skips_only_oversized_statements() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
build_handler();
let mut statement1 = Statement::new();
statement1.set_plain_data(vec![1u8; 100]);
let hash1 = statement1.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash1, statement1.clone());
let mut oversized1 = Statement::new();
oversized1.set_plain_data(vec![2u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize * 100]);
let hash_oversized1 = oversized1.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_oversized1, oversized1);
let mut statement2 = Statement::new();
statement2.set_plain_data(vec![3u8; 100]);
let hash2 = statement2.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash2, statement2.clone());
let mut oversized2 = Statement::new();
oversized2.set_plain_data(vec![4u8; MAX_STATEMENT_NOTIFICATION_SIZE as usize]);
let hash_oversized2 = oversized2.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash_oversized2, oversized2);
let mut statement3 = Statement::new();
statement3.set_plain_data(vec![5u8; 100]);
let hash3 = statement3.hash();
statement_store
.recent_statements
.lock()
.unwrap()
.insert(hash3, statement3.clone());
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
let mut sent_hashes = sent
.iter()
.flat_map(|(_peer, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect::<Vec<_>>();
sent_hashes.sort();
let mut expected_hashes = vec![hash1, hash2, hash3];
expected_hashes.sort();
assert_eq!(sent_hashes, expected_hashes, "Only small statements should be sent");
}
fn build_handler_no_peers() -> (
StatementHandler<TestNetwork, TestSync>,
TestStatementStore,
TestNetwork,
TestNotificationService,
) {
let statement_store = TestStatementStore::new();
let (queue_sender, _queue_receiver) = async_channel::bounded(2);
let network = TestNetwork::new();
let notification_service = TestNotificationService::new();
let handler = StatementHandler {
protocol_name: "/statement/1".into(),
notification_service: Box::new(notification_service.clone()),
propagate_timeout: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = ()> + Send>>)
.fuse(),
pending_statements: FuturesUnordered::new(),
pending_statements_peers: HashMap::new(),
network: network.clone(),
sync: TestSync {},
sync_event_stream: (Box::pin(futures::stream::pending())
as Pin<Box<dyn Stream<Item = soil_network::sync::types::SyncEvent> + Send>>)
.fuse(),
peers: HashMap::new(),
statement_store: Arc::new(statement_store.clone()),
queue_sender,
statements_per_second: NonZeroU32::new(DEFAULT_STATEMENTS_PER_SECOND)
.expect("DEFAULT_STATEMENTS_PER_SECOND is nonzero"),
metrics: None,
initial_sync_timeout: Box::pin(futures::future::pending()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
};
(handler, statement_store, network, notification_service)
}
#[tokio::test]
async fn test_initial_sync_burst_single_peer() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();
let num_statements = 200;
let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = (i % 256) as u8;
data[1] = (i / 256) as u8;
statement.set_plain_data(data);
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.statements.lock().unwrap().insert(hash, statement);
}
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: soil_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert!(handler.peers.contains_key(&peer_id));
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.initial_sync_peer_queue.len(), 1);
let mut burst_count = 0;
while handler.pending_initial_syncs.contains_key(&peer_id) {
handler.process_initial_sync_burst().await;
burst_count += 1;
assert!(burst_count <= 300, "Too many bursts, possible infinite loop");
}
assert!(
burst_count >= 10,
"Expected multiple bursts for 200 statements of 100KB each, got {}",
burst_count
);
let sent = notification_service.get_sent_notifications();
let mut sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(peer, notification)| {
assert_eq!(*peer, peer_id);
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
sent_hashes.sort();
expected_hashes.sort();
assert_eq!(
sent_hashes.len(),
expected_hashes.len(),
"Expected {} statements to be sent, got {}",
expected_hashes.len(),
sent_hashes.len()
);
assert_eq!(sent_hashes, expected_hashes, "All statements should be sent");
assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
assert!(handler.initial_sync_peer_queue.is_empty());
}
#[tokio::test]
async fn test_initial_sync_burst_multiple_peers_round_robin() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();
let num_statements = 200;
let statement_size = 100 * 1024; let mut expected_hashes = Vec::new();
for i in 0..num_statements {
let mut statement = Statement::new();
let mut data = vec![0u8; statement_size];
data[0] = (i % 256) as u8;
data[1] = (i / 256) as u8;
statement.set_plain_data(data);
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.statements.lock().unwrap().insert(hash, statement);
}
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let peer3 = PeerId::random();
for peer in [peer1, peer2, peer3] {
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer,
direction: soil_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
}
assert_eq!(handler.peers.len(), 3);
assert_eq!(handler.pending_initial_syncs.len(), 3);
assert_eq!(handler.initial_sync_peer_queue.len(), 3);
let mut peer_burst_order = Vec::new();
let mut burst_count = 0;
while !handler.pending_initial_syncs.is_empty() {
if let Some(&next_peer) = handler.initial_sync_peer_queue.front() {
peer_burst_order.push(next_peer);
}
handler.process_initial_sync_burst().await;
burst_count += 1;
assert!(burst_count <= 500, "Too many bursts, possible infinite loop");
}
assert!(
burst_count >= 30,
"Expected many bursts for 3 peers with 200 statements each, got {}",
burst_count
);
assert!(peer_burst_order.len() >= 9, "Expected at least 9 bursts");
assert_eq!(peer_burst_order[0], peer1, "First burst should be peer1");
assert_eq!(peer_burst_order[1], peer2, "Second burst should be peer2");
assert_eq!(peer_burst_order[2], peer3, "Third burst should be peer3");
assert_eq!(peer_burst_order[3], peer1, "Fourth burst should be peer1");
assert_eq!(peer_burst_order[4], peer2, "Fifth burst should be peer2");
assert_eq!(peer_burst_order[5], peer3, "Sixth burst should be peer3");
let sent = notification_service.get_sent_notifications();
let mut peer1_hashes: Vec<_> = sent
.iter()
.filter(|(peer, _)| *peer == peer1)
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
let mut peer2_hashes: Vec<_> = sent
.iter()
.filter(|(peer, _)| *peer == peer2)
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
let mut peer3_hashes: Vec<_> = sent
.iter()
.filter(|(peer, _)| *peer == peer3)
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
peer1_hashes.sort();
peer2_hashes.sort();
peer3_hashes.sort();
expected_hashes.sort();
assert_eq!(peer1_hashes, expected_hashes, "Peer1 should receive all statements");
assert_eq!(peer2_hashes, expected_hashes, "Peer2 should receive all statements");
assert_eq!(peer3_hashes, expected_hashes, "Peer3 should receive all statements");
assert!(handler.pending_initial_syncs.is_empty());
assert!(handler.initial_sync_peer_queue.is_empty());
}
#[tokio::test]
async fn test_send_statements_in_chunks_exact_max_size() {
let (mut handler, statement_store, _network, notification_service, _queue_receiver) =
build_handler();
let max_size = MAX_STATEMENT_NOTIFICATION_SIZE as usize - Compact::<u32>::max_encoded_len();
let num_statements: usize = 100;
let per_statement_overhead = 1 + 1 + 8 + 1 + 2; let total_overhead = per_statement_overhead * num_statements;
let total_data_size = max_size - total_overhead;
let per_statement_data_size = total_data_size / num_statements;
let remainder = total_data_size % num_statements;
let mut expected_hashes = Vec::with_capacity(num_statements);
let mut total_encoded_size = 0;
for i in 0..num_statements {
let mut statement = Statement::new();
let extra = if i < remainder { 1 } else { 0 };
let mut data = vec![42u8; per_statement_data_size + extra];
data[0] = i as u8;
data[1] = (i >> 8) as u8;
statement.set_plain_data(data);
total_encoded_size += statement.encoded_size();
let hash = statement.hash();
expected_hashes.push(hash);
statement_store.recent_statements.lock().unwrap().insert(hash, statement);
}
assert!(
total_encoded_size == max_size,
"Total encoded size {} should be <= max_size {}",
total_encoded_size,
max_size
);
handler.propagate_statements().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(
sent.len(),
1,
"Expected 1 notification for all {} statements, but got {}",
num_statements,
sent.len()
);
let (_peer, notification) = &sent[0];
assert!(
notification.len() <= MAX_STATEMENT_NOTIFICATION_SIZE as usize,
"Notification size {} exceeds limit {}",
notification.len(),
MAX_STATEMENT_NOTIFICATION_SIZE
);
let decoded = <Statements as Decode>::decode(&mut notification.as_slice()).unwrap();
assert_eq!(
decoded.len(),
num_statements,
"Expected {} statements in the notification",
num_statements
);
let mut received_hashes: Vec<_> = decoded.iter().map(|s| s.hash()).collect();
expected_hashes.sort();
received_hashes.sort();
assert_eq!(expected_hashes, received_hashes, "All statement hashes should match");
}
#[tokio::test]
async fn test_initial_sync_burst_size_limit_consistency() {
let (mut handler, statement_store, _network, notification_service) =
build_handler_no_peers();
let payload_limit = max_statement_payload_size();
let first_stmt_data_size = payload_limit / 2 + 10;
let mut stmt1 = Statement::new();
stmt1.set_plain_data(vec![1u8; first_stmt_data_size]);
let stmt1_encoded_size = stmt1.encoded_size();
let remaining = payload_limit.saturating_sub(stmt1_encoded_size);
let target_stmt2_encoded = remaining + 3; let stmt2_data_size = target_stmt2_encoded.saturating_sub(4); let mut stmt2 = Statement::new();
stmt2.set_plain_data(vec![2u8; stmt2_data_size]);
let stmt2_encoded_size = stmt2.encoded_size();
let total_encoded = stmt1_encoded_size + stmt2_encoded_size;
assert!(
total_encoded > payload_limit,
"Total {} should exceed payload_limit {} so filter rejects second statement",
total_encoded,
payload_limit
);
let hash1 = stmt1.hash();
let hash2 = stmt2.hash();
statement_store.statements.lock().unwrap().insert(hash1, stmt1);
statement_store.statements.lock().unwrap().insert(hash2, stmt2);
let peer_id = PeerId::random();
handler
.handle_notification_event(NotificationEvent::NotificationStreamOpened {
peer: peer_id,
direction: soil_network::service::traits::Direction::Inbound,
handshake: vec![],
negotiated_fallback: None,
})
.await;
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 2);
handler.process_initial_sync_burst().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(sent.len(), 1, "First burst should send one notification");
let decoded = <Statements as Decode>::decode(&mut sent[0].1.as_slice()).unwrap();
assert_eq!(decoded.len(), 1, "First notification should contain one statement");
let sent_hash = decoded[0].hash();
assert!(
sent_hash == hash1 || sent_hash == hash2,
"Sent statement should be one of the two created"
);
assert!(handler.pending_initial_syncs.contains_key(&peer_id));
assert_eq!(handler.pending_initial_syncs.get(&peer_id).unwrap().hashes.len(), 1);
handler.process_initial_sync_burst().await;
let sent = notification_service.get_sent_notifications();
assert_eq!(sent.len(), 2, "Second burst should send another notification");
let mut sent_hashes: Vec<_> = sent
.iter()
.flat_map(|(_, notification)| {
<Statements as Decode>::decode(&mut notification.as_slice()).unwrap()
})
.map(|s| s.hash())
.collect();
sent_hashes.sort();
let mut expected_hashes = vec![hash1, hash2];
expected_hashes.sort();
assert_eq!(sent_hashes, expected_hashes, "Both statements should be sent");
assert!(!handler.pending_initial_syncs.contains_key(&peer_id));
}
#[tokio::test]
async fn test_peer_disconnected_on_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let mut flood_statements = Vec::new();
for i in 0..600_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![i as u8, (i >> 8) as u8, (i >> 16) as u8]);
flood_statements.push(statement);
}
handler.on_statements(peer_id, flood_statements);
let reports = network.get_reports();
assert!(
reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Expected STATEMENT_FLOODING reputation change, but got: {:?}",
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Expected peer {} to be disconnected, but it wasn't. Disconnected peers: {:?}",
peer_id,
disconnected
);
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
assert!(
!handler.pending_initial_syncs.contains_key(&peer_id),
"Peer should be removed from pending_initial_syncs"
);
assert!(
!handler.initial_sync_peer_queue.contains(&peer_id),
"Peer should be removed from initial_sync_peer_queue"
);
}
#[tokio::test]
async fn test_legitimate_traffic_not_flagged() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let start = std::time::Instant::now();
let duration = std::time::Duration::from_secs(5);
let mut counter = 0u32;
while start.elapsed() < duration {
let mut statements = Vec::new();
for i in 0..5_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
counter as u8,
(counter >> 8) as u8,
(counter >> 16) as u8,
i as u8,
]);
statements.push(statement);
counter = counter.wrapping_add(1);
}
handler.on_statements(peer_id, statements);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let reports = network.get_reports();
assert!(
!reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Legitimate traffic should not trigger flooding detection. Reports: {:?}",
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
!disconnected.contains(&peer_id),
"Legitimate traffic should not cause disconnection. Disconnected peers: {:?}",
disconnected
);
assert!(handler.peers.contains_key(&peer_id), "Peer should still be connected");
}
#[tokio::test]
async fn test_just_over_rate_limit_triggers_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let mut statements = Vec::new();
for i in 0..260_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
i as u8,
(i >> 8) as u8,
(i >> 16) as u8,
(i >> 24) as u8,
]);
statements.push(statement);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
let expected_burst = DEFAULT_STATEMENTS_PER_SECOND * config::STATEMENTS_BURST_COEFFICIENT;
assert!(
reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"Sending 260,000 statements should trigger flooding (burst limit: {}). Reports: {:?}",
expected_burst,
reports
);
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Peer should be disconnected after exceeding rate limit. Disconnected: {:?}",
disconnected
);
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
}
#[tokio::test]
async fn test_burst_of_250k_statements_allowed() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let mut statements = Vec::new();
for i in 0..250_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
i as u8,
(i >> 8) as u8,
(i >> 16) as u8,
(i >> 24) as u8,
]);
statements.push(statement);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
assert!(
!reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING),
"250k burst should be allowed (burst = rate × 5). Reports: {:?}",
reports
);
assert!(
handler.peers.contains_key(&peer_id),
"Peer should still be connected after 250k burst"
);
}
#[tokio::test]
async fn test_sustained_rate_above_limit_triggers_flooding() {
let (mut handler, _statement_store, network, _notification_service, _queue_receiver) =
build_handler();
let peer_id = *handler.peers.keys().next().unwrap();
let mut counter = 0u32;
let start = std::time::Instant::now();
let duration = std::time::Duration::from_secs(5);
let mut flooding_detected = false;
while start.elapsed() < duration {
let mut statements = Vec::new();
for i in 0..30_000 {
let mut statement = Statement::new();
statement.set_plain_data(vec![
counter as u8,
(counter >> 8) as u8,
(counter >> 16) as u8,
i as u8,
]);
statements.push(statement);
counter = counter.wrapping_add(1);
}
handler.on_statements(peer_id, statements);
let reports = network.get_reports();
if reports
.iter()
.any(|(id, rep)| *id == peer_id && *rep == rep::STATEMENT_FLOODING)
{
flooding_detected = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
assert!(flooding_detected, "Sustained rate of 300k/sec should trigger flooding");
let disconnected = network.get_disconnected_peers();
assert!(
disconnected.contains(&peer_id),
"Peer should be disconnected after sustained high rate. Disconnected: {:?}",
disconnected
);
assert!(!handler.peers.contains_key(&peer_id), "Peer should be removed from peers map");
}
}