mod bincode_format;
pub(crate) mod blocklist;
mod chain_info;
mod config;
mod counting_format;
mod error;
mod event;
mod gossiped_address;
mod health;
mod identity;
mod insights;
mod limiter;
mod message;
mod message_pack_format;
mod metrics;
mod outgoing;
mod symmetry;
pub(crate) mod tasks;
#[cfg(test)]
mod tests;
use std::{
collections::{
hash_map::{Entry, HashMap},
BTreeMap, BTreeSet, HashSet,
},
fmt::{self, Debug, Display, Formatter},
io,
net::{SocketAddr, TcpListener},
sync::{Arc, Weak},
time::{Duration, Instant},
};
use datasize::DataSize;
use itertools::Itertools;
use prometheus::Registry;
use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
};
use serde::{Deserialize, Serialize};
use tokio::{
net::TcpStream,
sync::{
mpsc::{self, UnboundedSender},
watch,
},
task::JoinHandle,
};
use tokio_openssl::SslStream;
use tokio_util::codec::LengthDelimitedCodec;
use tracing::{debug, error, info, trace, warn, Instrument, Span};
#[cfg(test)]
use futures::{future::BoxFuture, FutureExt};
use casper_types::{EraId, PublicKey, SecretKey};
pub(crate) use self::{
bincode_format::BincodeFormat,
config::{Config, IdentityConfig},
error::Error,
event::Event,
gossiped_address::GossipedAddress,
identity::Identity,
insights::NetworkInsights,
message::{
within_message_size_limit_tolerance, EstimatorWeights, FromIncoming, Message, MessageKind,
Payload,
},
};
use self::{
blocklist::BlocklistJustification,
chain_info::ChainInfo,
counting_format::{ConnectionId, CountingFormat, Role},
error::{ConnectionError, Result},
event::{IncomingConnection, OutgoingConnection},
health::{HealthConfig, TaggedTimestamp},
limiter::Limiter,
message::NodeKeyPair,
metrics::Metrics,
outgoing::{DialOutcome, DialRequest, OutgoingConfig, OutgoingManager},
symmetry::ConnectionSymmetry,
tasks::{MessageQueueItem, NetworkContext},
};
use crate::{
components::{gossiper::GossipItem, Component, ComponentState, InitializedComponent},
effect::{
announcements::PeerBehaviorAnnouncement,
requests::{BeginGossipRequest, NetworkInfoRequest, NetworkRequest, StorageRequest},
AutoClosingResponder, EffectBuilder, EffectExt, Effects, GossipTarget,
},
reactor::ReactorEvent,
tls,
types::{NodeId, ValidatorMatrix},
utils::{self, display_error, Source},
NodeRng,
};
const COMPONENT_NAME: &str = "network";
const RECONNECTION_ATTEMPTS: u8 = 8;
const BASE_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
const OUTGOING_MANAGER_SWEEP_INTERVAL: Duration = Duration::from_secs(1);
const PING_INTERVAL: Duration = Duration::from_secs(30);
const PING_TIMEOUT: Duration = Duration::from_secs(6);
const PING_RETRIES: u16 = 5;
#[derive(Clone, DataSize, Debug)]
pub(crate) struct OutgoingHandle<P> {
#[data_size(skip)] sender: UnboundedSender<MessageQueueItem<P>>,
peer_addr: SocketAddr,
}
impl<P> Display for OutgoingHandle<P> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "outgoing handle to {}", self.peer_addr)
}
}
#[derive(DataSize)]
pub(crate) struct Network<REv, P>
where
REv: 'static,
P: Payload,
{
cfg: Config,
context: Arc<NetworkContext<REv>>,
outgoing_manager: OutgoingManager<OutgoingHandle<P>, ConnectionError>,
connection_symmetries: HashMap<NodeId, ConnectionSymmetry>,
syncing_nodes: HashSet<NodeId>,
#[data_size(skip)]
channel_management: Option<ChannelManagement>,
#[data_size(skip)]
net_metrics: Arc<Metrics>,
#[data_size(skip)]
outgoing_limiter: Limiter,
#[data_size(skip)]
incoming_limiter: Limiter,
active_era: EraId,
state: ComponentState,
}
struct ChannelManagement {
#[allow(dead_code)]
shutdown_sender: Option<watch::Sender<()>>,
#[allow(dead_code)]
server_join_handle: Option<JoinHandle<()>>,
#[allow(dead_code)]
close_incoming_sender: Option<watch::Sender<()>>,
close_incoming_receiver: watch::Receiver<()>,
}
impl<REv, P> Network<REv, P>
where
P: Payload + 'static,
REv: ReactorEvent
+ From<Event<P>>
+ FromIncoming<P>
+ From<StorageRequest>
+ From<NetworkRequest<P>>
+ From<PeerBehaviorAnnouncement>
+ From<BeginGossipRequest<GossipedAddress>>,
{
#[allow(clippy::type_complexity)]
pub(crate) fn new<C: Into<ChainInfo>>(
cfg: Config,
our_identity: Identity,
node_key_pair: Option<(Arc<SecretKey>, PublicKey)>,
registry: &Registry,
chain_info_source: C,
validator_matrix: ValidatorMatrix,
allow_handshake: bool,
) -> Result<Network<REv, P>> {
let net_metrics = Arc::new(Metrics::new(registry)?);
let outgoing_limiter = Limiter::new(
cfg.max_outgoing_byte_rate_non_validators,
net_metrics.accumulated_outgoing_limiter_delay.clone(),
validator_matrix.clone(),
);
let incoming_limiter = Limiter::new(
cfg.max_incoming_message_rate_non_validators,
net_metrics.accumulated_incoming_limiter_delay.clone(),
validator_matrix,
);
let outgoing_manager = OutgoingManager::with_metrics(
OutgoingConfig {
retry_attempts: RECONNECTION_ATTEMPTS,
base_timeout: BASE_RECONNECTION_TIMEOUT,
unblock_after_min: cfg.blocklist_retain_min_duration.into(),
unblock_after_max: cfg.blocklist_retain_max_duration.into(),
sweep_timeout: cfg.max_addr_pending_time.into(),
health: HealthConfig {
ping_interval: PING_INTERVAL,
ping_timeout: PING_TIMEOUT,
ping_retries: PING_RETRIES,
pong_limit: (1 + PING_RETRIES as u32) * 2,
},
},
net_metrics.create_outgoing_metrics(),
);
let context = Arc::new(NetworkContext::new(
&cfg,
our_identity,
node_key_pair.map(NodeKeyPair::new),
chain_info_source.into(),
&net_metrics,
allow_handshake,
));
let component = Network {
cfg,
context,
outgoing_manager,
connection_symmetries: HashMap::new(),
syncing_nodes: HashSet::new(),
channel_management: None,
net_metrics,
outgoing_limiter,
incoming_limiter,
active_era: EraId::new(0),
state: ComponentState::Uninitialized,
};
Ok(component)
}
fn initialize(&mut self, effect_builder: EffectBuilder<REv>) -> Result<Effects<Event<P>>> {
let mut known_addresses = HashSet::new();
for address in &self.cfg.known_addresses {
match utils::resolve_address(address) {
Ok(known_address) => {
if !known_addresses.insert(known_address) {
warn!(%address, resolved=%known_address, "ignoring duplicated known address");
};
}
Err(ref err) => {
warn!(%address, err=display_error(err), "failed to resolve known address");
}
}
}
if known_addresses.is_empty() {
warn!("no known addresses provided via config or all failed DNS resolution");
return Err(Error::EmptyKnownHosts);
}
let mut public_addr =
utils::resolve_address(&self.cfg.public_address).map_err(Error::ResolveAddr)?;
let bind_address =
utils::resolve_address(&self.cfg.bind_address).map_err(Error::ResolveAddr)?;
let listener = TcpListener::bind(bind_address)
.map_err(|error| Error::ListenerCreation(error, bind_address))?;
listener
.set_nonblocking(true)
.map_err(Error::ListenerSetNonBlocking)?;
let local_addr = listener.local_addr().map_err(Error::ListenerAddr)?;
if public_addr.port() == 0 {
public_addr.set_port(local_addr.port());
}
Arc::get_mut(&mut self.context)
.expect("should be no other pointers")
.initialize(public_addr, effect_builder.into_inner());
let protocol_version = self.context.chain_info().protocol_version;
info!(%local_addr, %public_addr, %protocol_version, "starting server background task");
let (server_shutdown_sender, server_shutdown_receiver) = watch::channel(());
let (close_incoming_sender, close_incoming_receiver) = watch::channel(());
let context = self.context.clone();
let server_join_handle = tokio::spawn(
tasks::server(
context,
tokio::net::TcpListener::from_std(listener).map_err(Error::ListenerConversion)?,
server_shutdown_receiver,
)
.in_current_span(),
);
let channel_management = ChannelManagement {
shutdown_sender: Some(server_shutdown_sender),
server_join_handle: Some(server_join_handle),
close_incoming_sender: Some(close_incoming_sender),
close_incoming_receiver,
};
self.channel_management = Some(channel_management);
let now = Instant::now();
let dial_requests: Vec<_> = known_addresses
.into_iter()
.filter_map(|addr| self.outgoing_manager.learn_addr(addr, true, now))
.collect();
let mut effects = self.process_dial_requests(dial_requests);
effects.extend(
effect_builder
.set_timeout(self.cfg.initial_gossip_delay.into())
.event(|_| Event::GossipOurAddress),
);
effects.extend(
effect_builder
.set_timeout(OUTGOING_MANAGER_SWEEP_INTERVAL)
.event(|_| Event::SweepOutgoing),
);
<Self as InitializedComponent<REv>>::set_state(self, ComponentState::Initialized);
Ok(effects)
}
fn channel_management(&self) -> &ChannelManagement {
self.channel_management
.as_ref()
.expect("component not initialized properly")
}
fn broadcast_message_to_validators(&self, msg: Arc<Message<P>>, era_id: EraId) {
self.net_metrics.broadcast_requests.inc();
let mut total_connected_validators_in_era = 0;
let mut total_outgoing_manager_connected_peers = 0;
for peer_id in self.outgoing_manager.connected_peers() {
total_outgoing_manager_connected_peers += 1;
if self.outgoing_limiter.is_validator_in_era(era_id, &peer_id) {
total_connected_validators_in_era += 1;
self.send_message(peer_id, msg.clone(), None);
}
}
debug!(
msg = %msg,
era = era_id.value(),
total_connected_validators_in_era,
total_outgoing_manager_connected_peers,
"broadcast_message_to_validators"
);
}
fn gossip_message(
&self,
rng: &mut NodeRng,
msg: Arc<Message<P>>,
gossip_target: GossipTarget,
count: usize,
exclude: &HashSet<NodeId>,
) -> HashSet<NodeId> {
let is_validator_in_era =
|era: EraId, peer_id: &NodeId| self.outgoing_limiter.is_validator_in_era(era, peer_id);
let peer_ids = choose_gossip_peers(
rng,
gossip_target,
count,
exclude,
self.outgoing_manager.connected_peers(),
is_validator_in_era,
);
if peer_ids.len() != count {
let not_excluded = self
.outgoing_manager
.connected_peers()
.filter(|peer_id| !exclude.contains(peer_id))
.count();
if not_excluded > 0 {
let connected = self.outgoing_manager.connected_peers().count();
debug!(
our_id=%self.context.our_id(),
%gossip_target,
wanted = count,
connected,
not_excluded,
selected = peer_ids.len(),
"could not select enough random nodes for gossiping"
);
}
}
for &peer_id in &peer_ids {
self.send_message(peer_id, msg.clone(), None);
}
peer_ids.into_iter().collect()
}
fn send_message(
&self,
dest: NodeId,
msg: Arc<Message<P>>,
opt_responder: Option<AutoClosingResponder<()>>,
) {
if let Some(connection) = self.outgoing_manager.get_route(dest) {
if msg.payload_is_unsafe_for_syncing_nodes() && self.syncing_nodes.contains(&dest) {
error!(kind=%msg.classify(), node_id=%dest, "sending unsafe message to syncing node");
}
if let Err(msg) = connection.sender.send((msg, opt_responder)) {
warn!(our_id=%self.context.our_id(), %dest, ?msg, "dropped outgoing message, lost connection");
} else {
self.net_metrics.queued_messages.inc();
}
} else {
debug!(our_id=%self.context.our_id(), %dest, ?msg, "dropped outgoing message, no connection");
}
}
fn handle_incoming_connection(
&mut self,
incoming: Box<IncomingConnection<P>>,
span: Span,
) -> Effects<Event<P>> {
span.clone().in_scope(|| match *incoming {
IncomingConnection::FailedEarly {
peer_addr: _,
ref error,
} => {
debug!(err=%display_error(error), "incoming connection failed early");
Effects::new()
}
IncomingConnection::Failed {
peer_addr: _,
peer_id: _,
ref error,
} => {
debug!(
err = display_error(error),
"incoming connection failed after TLS setup"
);
Effects::new()
}
IncomingConnection::Loopback => {
info!("successful incoming loopback connection, will be dropped");
Effects::new()
}
IncomingConnection::Established {
peer_addr,
public_addr,
peer_id,
peer_consensus_public_key,
stream,
} => {
if self.cfg.max_incoming_peer_connections != 0 {
if let Some(symmetries) = self.connection_symmetries.get(&peer_id) {
let incoming_count = symmetries
.incoming_addrs()
.map(BTreeSet::len)
.unwrap_or_default();
if incoming_count >= self.cfg.max_incoming_peer_connections as usize {
info!(%public_addr,
%peer_id,
count=incoming_count,
limit=self.cfg.max_incoming_peer_connections,
"rejecting new incoming connection, limit for peer exceeded"
);
return Effects::new();
}
}
}
info!(%public_addr, "new incoming connection established");
let dial_requests =
self.outgoing_manager
.learn_addr(public_addr, false, Instant::now());
let mut effects = self.process_dial_requests(dial_requests);
if self
.connection_symmetries
.entry(peer_id)
.or_default()
.add_incoming(peer_addr, Instant::now())
{
self.connection_completed(peer_id);
}
let boxed_span = Box::new(span.clone());
effects.extend(
tasks::message_reader(
self.context.clone(),
stream,
self.incoming_limiter
.create_handle(peer_id, peer_consensus_public_key),
self.channel_management().close_incoming_receiver.clone(),
peer_id,
span.clone(),
)
.instrument(span)
.event(move |result| Event::IncomingClosed {
result,
peer_id: Box::new(peer_id),
peer_addr,
span: boxed_span,
}),
);
effects
}
})
}
fn handle_incoming_closed(
&mut self,
result: io::Result<()>,
peer_id: NodeId,
peer_addr: SocketAddr,
span: Span,
) -> Effects<Event<P>> {
span.in_scope(|| {
match result {
Ok(()) => info!("regular connection closing"),
Err(ref err) => warn!(err = display_error(err), "connection dropped"),
}
if let Entry::Occupied(mut entry) = self.connection_symmetries.entry(peer_id) {
if entry.get_mut().remove_incoming(peer_addr, Instant::now()) {
entry.remove();
}
}
Effects::new()
})
}
fn is_blockable_offense_for_outgoing(
error: &ConnectionError,
) -> Option<BlocklistJustification> {
match error {
ConnectionError::TlsInitialization(_)
| ConnectionError::TcpConnection(_)
| ConnectionError::TcpNoDelay(_)
| ConnectionError::TlsHandshake(_)
| ConnectionError::HandshakeSend(_)
| ConnectionError::HandshakeRecv(_)
| ConnectionError::HandshakeNotAllowed
| ConnectionError::IncompatibleVersion(_) => None,
ConnectionError::HandshakeSenderCrashed(_)
| ConnectionError::FailedToReuniteHandshakeSinkAndStream
| ConnectionError::CouldNotEncodeOurHandshake(_) => None,
ConnectionError::NoPeerCertificate
| ConnectionError::PeerCertificateInvalid(_)
| ConnectionError::DidNotSendHandshake
| ConnectionError::InvalidRemoteHandshakeMessage(_)
| ConnectionError::InvalidConsensusCertificate(_) => None,
ConnectionError::WrongNetwork(peer_network_name) => {
Some(BlocklistJustification::WrongNetwork {
peer_network_name: peer_network_name.clone(),
})
}
ConnectionError::WrongChainspecHash(peer_chainspec_hash) => {
Some(BlocklistJustification::WrongChainspecHash {
peer_chainspec_hash: *peer_chainspec_hash,
})
}
ConnectionError::MissingChainspecHash => {
Some(BlocklistJustification::MissingChainspecHash)
}
}
}
#[allow(clippy::redundant_clone)]
fn handle_outgoing_connection(
&mut self,
outgoing: OutgoingConnection<P>,
span: Span,
rng: &mut NodeRng,
) -> Effects<Event<P>> {
let now = Instant::now();
span.clone().in_scope(|| match outgoing {
OutgoingConnection::FailedEarly { peer_addr, error }
| OutgoingConnection::Failed {
peer_addr,
peer_id: _,
error,
} => {
debug!(err=%display_error(&error), "outgoing connection failed");
let mut requests = Vec::new();
if let Some(justification) = Self::is_blockable_offense_for_outgoing(&error) {
requests.extend(self.outgoing_manager.block_addr(
peer_addr,
now,
justification,
rng,
));
}
requests.extend(
self.outgoing_manager
.handle_dial_outcome(DialOutcome::Failed {
addr: peer_addr,
error,
when: now,
}),
);
self.process_dial_requests(requests)
}
OutgoingConnection::Loopback { peer_addr } => {
info!("successful outgoing loopback connection, will be dropped");
let request = self
.outgoing_manager
.handle_dial_outcome(DialOutcome::Loopback { addr: peer_addr });
self.process_dial_requests(request)
}
OutgoingConnection::Established {
peer_addr,
peer_id,
peer_consensus_public_key,
sink,
is_syncing,
} => {
info!("new outgoing connection established");
let (sender, receiver) = mpsc::unbounded_channel();
let handle = OutgoingHandle { sender, peer_addr };
let request = self
.outgoing_manager
.handle_dial_outcome(DialOutcome::Successful {
addr: peer_addr,
handle,
node_id: peer_id,
when: now,
});
let mut effects = self.process_dial_requests(request);
if self
.connection_symmetries
.entry(peer_id)
.or_default()
.mark_outgoing(now)
{
self.connection_completed(peer_id);
self.update_syncing_nodes_set(peer_id, is_syncing);
}
effects.extend(
tasks::message_sender(
receiver,
sink,
self.outgoing_limiter
.create_handle(peer_id, peer_consensus_public_key),
self.net_metrics.queued_messages.clone(),
)
.instrument(span)
.event(move |_| Event::OutgoingDropped {
peer_id: Box::new(peer_id),
peer_addr,
}),
);
effects
}
})
}
fn handle_network_request(
&self,
request: NetworkRequest<P>,
rng: &mut NodeRng,
) -> Effects<Event<P>> {
match request {
NetworkRequest::SendMessage {
dest,
payload,
respond_after_queueing,
auto_closing_responder,
} => {
self.net_metrics.direct_message_requests.inc();
if respond_after_queueing {
self.send_message(*dest, Arc::new(Message::Payload(*payload)), None);
auto_closing_responder.respond(()).ignore()
} else {
self.send_message(
*dest,
Arc::new(Message::Payload(*payload)),
Some(auto_closing_responder),
);
Effects::new()
}
}
NetworkRequest::ValidatorBroadcast {
payload,
era_id,
auto_closing_responder,
} => {
self.broadcast_message_to_validators(Arc::new(Message::Payload(*payload)), era_id);
auto_closing_responder.respond(()).ignore()
}
NetworkRequest::Gossip {
payload,
gossip_target,
count,
exclude,
auto_closing_responder,
} => {
let sent_to = self.gossip_message(
rng,
Arc::new(Message::Payload(*payload)),
gossip_target,
count,
&exclude,
);
auto_closing_responder.respond(sent_to).ignore()
}
}
}
fn handle_outgoing_dropped(
&mut self,
peer_id: NodeId,
peer_addr: SocketAddr,
) -> Effects<Event<P>> {
let requests = self
.outgoing_manager
.handle_connection_drop(peer_addr, Instant::now());
if let Entry::Occupied(mut entry) = self.connection_symmetries.entry(peer_id) {
if entry.get_mut().unmark_outgoing(Instant::now()) {
entry.remove();
}
}
self.outgoing_limiter.remove_connected_validator(&peer_id);
self.process_dial_requests(requests)
}
fn process_dial_requests<T>(&mut self, requests: T) -> Effects<Event<P>>
where
T: IntoIterator<Item = DialRequest<OutgoingHandle<P>>>,
{
let mut effects = Effects::new();
for request in requests {
trace!(%request, "processing dial request");
match request {
DialRequest::Dial { addr, span } => effects.extend(
tasks::connect_outgoing(self.context.clone(), addr)
.instrument(span.clone())
.event(|outgoing| Event::OutgoingConnection {
outgoing: Box::new(outgoing),
span,
}),
),
DialRequest::Disconnect { handle: _, span } => {
span.in_scope(|| {
debug!("dropping connection, as requested");
});
}
DialRequest::SendPing {
peer_id,
nonce,
span,
} => span.in_scope(|| {
trace!("enqueuing ping to be sent");
self.send_message(peer_id, Arc::new(Message::Ping { nonce }), None);
}),
}
}
effects
}
fn handle_incoming_message(
&mut self,
effect_builder: EffectBuilder<REv>,
peer_id: NodeId,
msg: Message<P>,
span: Span,
) -> Effects<Event<P>>
where
REv: FromIncoming<P> + From<PeerBehaviorAnnouncement>,
{
span.in_scope(|| match msg {
Message::Handshake { .. } => {
warn!("received unexpected handshake");
Effects::new()
}
Message::Ping { nonce } => {
self.send_message(peer_id, Arc::new(Message::Pong { nonce }), None);
Effects::new()
}
Message::Pong { nonce } => {
let pong = TaggedTimestamp::from_parts(Instant::now(), nonce);
if self.outgoing_manager.record_pong(peer_id, pong) {
info!(
"peer {} exceeded failed pong limit, or allowed number of pongs",
peer_id );
}
Effects::new()
}
Message::Payload(payload) => {
effect_builder.announce_incoming(peer_id, payload).ignore()
}
})
}
fn connection_completed(&self, peer_id: NodeId) {
trace!(num_peers = self.peers().len(), new_peer=%peer_id, "connection complete");
self.net_metrics.peers.set(self.peers().len() as i64);
}
fn update_syncing_nodes_set(&mut self, peer_id: NodeId, is_syncing: bool) {
if is_syncing {
debug!(%peer_id, "is syncing");
self.syncing_nodes.insert(peer_id);
} else {
debug!(%peer_id, "is no longer syncing");
self.syncing_nodes.remove(&peer_id);
}
}
pub(crate) fn peers(&self) -> BTreeMap<NodeId, String> {
let mut ret = BTreeMap::new();
for node_id in self.outgoing_manager.connected_peers() {
if let Some(connection) = self.outgoing_manager.get_route(node_id) {
ret.insert(node_id, connection.peer_addr.to_string());
} else {
warn!(%node_id, "route disappeared unexpectedly")
}
}
for (node_id, sym) in &self.connection_symmetries {
if let Some(addrs) = sym.incoming_addrs() {
for addr in addrs {
ret.entry(*node_id).or_insert_with(|| addr.to_string());
}
}
}
ret
}
pub(crate) fn fully_connected_peers_random(
&self,
rng: &mut NodeRng,
count: usize,
) -> Vec<NodeId> {
self.connection_symmetries
.iter()
.filter(|(_, sym)| matches!(sym, ConnectionSymmetry::Symmetric { .. }))
.map(|(node_id, _)| *node_id)
.choose_multiple(rng, count)
}
pub(crate) fn has_sufficient_fully_connected_peers(&self) -> bool {
self.connection_symmetries
.iter()
.filter(|(_node_id, sym)| matches!(sym, ConnectionSymmetry::Symmetric { .. }))
.count()
>= self.cfg.min_peers_for_initialization as usize
}
#[cfg(test)]
pub(crate) fn node_id(&self) -> NodeId {
self.context.our_id()
}
}
#[cfg(test)]
const MAX_METRICS_DROP_ATTEMPTS: usize = 25;
#[cfg(test)]
const DROP_RETRY_DELAY: Duration = Duration::from_millis(100);
#[cfg(test)]
impl<REv, P> crate::reactor::Finalize for Network<REv, P>
where
REv: Send + 'static,
P: Payload,
{
fn finalize(mut self) -> BoxFuture<'static, ()> {
async move {
if let Some(mut channel_management) = self.channel_management.take() {
drop(channel_management.shutdown_sender.take());
drop(channel_management.close_incoming_sender.take());
if let Some(join_handle) = channel_management.server_join_handle.take() {
match join_handle.await {
Ok(_) => debug!(our_id=%self.context.our_id(), "server exited cleanly"),
Err(ref err) => {
error!(
our_id=%self.context.our_id(),
err=display_error(err),
"could not join server task cleanly"
);
}
}
}
}
utils::wait_for_arc_drop(
self.net_metrics,
MAX_METRICS_DROP_ATTEMPTS,
DROP_RETRY_DELAY,
)
.await;
}
.boxed()
}
}
fn choose_gossip_peers<F>(
rng: &mut NodeRng,
gossip_target: GossipTarget,
count: usize,
exclude: &HashSet<NodeId>,
connected_peers: impl Iterator<Item = NodeId>,
is_validator_in_era: F,
) -> HashSet<NodeId>
where
F: Fn(EraId, &NodeId) -> bool,
{
let filtered_peers = connected_peers.filter(|peer_id| !exclude.contains(peer_id));
match gossip_target {
GossipTarget::Mixed(era_id) => {
let (validators, non_validators): (Vec<_>, Vec<_>) =
filtered_peers.partition(|node_id| is_validator_in_era(era_id, node_id));
let (first, second) = if rng.gen() {
(validators, non_validators)
} else {
(non_validators, validators)
};
first
.choose_multiple(rng, count)
.interleave(second.iter().choose_multiple(rng, count))
.take(count)
.copied()
.collect()
}
GossipTarget::All => filtered_peers
.choose_multiple(rng, count)
.into_iter()
.collect(),
}
}
impl<REv, P> Component<REv> for Network<REv, P>
where
REv: ReactorEvent
+ From<Event<P>>
+ From<BeginGossipRequest<GossipedAddress>>
+ FromIncoming<P>
+ From<StorageRequest>
+ From<NetworkRequest<P>>
+ From<PeerBehaviorAnnouncement>,
P: Payload,
{
type Event = Event<P>;
fn name(&self) -> &str {
COMPONENT_NAME
}
fn handle_event(
&mut self,
effect_builder: EffectBuilder<REv>,
rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match &self.state {
ComponentState::Fatal(msg) => {
error!(
msg,
?event,
name = <Self as Component<REv>>::name(self),
"should not handle this event when this component has fatal error"
);
Effects::new()
}
ComponentState::Uninitialized => {
warn!(
?event,
name = <Self as Component<REv>>::name(self),
"should not handle this event when component is uninitialized"
);
Effects::new()
}
ComponentState::Initializing => match event {
Event::Initialize => match self.initialize(effect_builder) {
Ok(effects) => effects,
Err(error) => {
error!(%error, "failed to initialize network component");
<Self as InitializedComponent<REv>>::set_state(
self,
ComponentState::Fatal(error.to_string()),
);
Effects::new()
}
},
Event::IncomingConnection { .. }
| Event::IncomingMessage { .. }
| Event::IncomingClosed { .. }
| Event::OutgoingConnection { .. }
| Event::OutgoingDropped { .. }
| Event::NetworkRequest { .. }
| Event::NetworkInfoRequest { .. }
| Event::GossipOurAddress
| Event::PeerAddressReceived(_)
| Event::SweepOutgoing
| Event::BlocklistAnnouncement(_) => {
warn!(
?event,
name = <Self as Component<REv>>::name(self),
"should not handle this event when component is pending initialization"
);
Effects::new()
}
},
ComponentState::Initialized => match event {
Event::Initialize => {
error!(
?event,
name = <Self as Component<REv>>::name(self),
"component already initialized"
);
Effects::new()
}
Event::IncomingConnection { incoming, span } => {
self.handle_incoming_connection(incoming, span)
}
Event::IncomingMessage { peer_id, msg, span } => {
self.handle_incoming_message(effect_builder, *peer_id, *msg, span)
}
Event::IncomingClosed {
result,
peer_id,
peer_addr,
span,
} => self.handle_incoming_closed(result, *peer_id, peer_addr, *span),
Event::OutgoingConnection { outgoing, span } => {
self.handle_outgoing_connection(*outgoing, span, rng)
}
Event::OutgoingDropped { peer_id, peer_addr } => {
self.handle_outgoing_dropped(*peer_id, peer_addr)
}
Event::NetworkRequest { req: request } => {
self.handle_network_request(*request, rng)
}
Event::NetworkInfoRequest { req } => match *req {
NetworkInfoRequest::Peers { responder } => {
responder.respond(self.peers()).ignore()
}
NetworkInfoRequest::FullyConnectedPeers { count, responder } => responder
.respond(self.fully_connected_peers_random(rng, count))
.ignore(),
NetworkInfoRequest::Insight { responder } => responder
.respond(NetworkInsights::collect_from_component(self))
.ignore(),
},
Event::GossipOurAddress => {
let our_address = GossipedAddress::new(
self.context
.public_addr()
.expect("component not initialized properly"),
);
let mut effects = effect_builder
.begin_gossip(our_address, Source::Ourself, our_address.gossip_target())
.ignore();
effects.extend(
effect_builder
.set_timeout(self.cfg.gossip_interval.into())
.event(|_| Event::GossipOurAddress),
);
effects
}
Event::PeerAddressReceived(gossiped_address) => {
let requests = self.outgoing_manager.learn_addr(
gossiped_address.into(),
false,
Instant::now(),
);
self.process_dial_requests(requests)
}
Event::SweepOutgoing => {
let now = Instant::now();
let requests = self.outgoing_manager.perform_housekeeping(rng, now);
let mut effects = self.process_dial_requests(requests);
effects.extend(
effect_builder
.set_timeout(OUTGOING_MANAGER_SWEEP_INTERVAL)
.event(|_| Event::SweepOutgoing),
);
effects
}
Event::BlocklistAnnouncement(announcement) => match announcement {
PeerBehaviorAnnouncement::OffenseCommitted {
offender,
justification,
} => {
info!(%offender, %justification, "adding peer to blocklist after transgression");
if let Some(addr) = self.outgoing_manager.get_addr(*offender) {
let requests = self.outgoing_manager.block_addr(
addr,
Instant::now(),
*justification,
rng,
);
self.process_dial_requests(requests)
} else {
Effects::new()
}
}
},
},
}
}
}
impl<REv, P> InitializedComponent<REv> for Network<REv, P>
where
REv: ReactorEvent
+ From<Event<P>>
+ From<BeginGossipRequest<GossipedAddress>>
+ FromIncoming<P>
+ From<StorageRequest>
+ From<NetworkRequest<P>>
+ From<PeerBehaviorAnnouncement>,
P: Payload,
{
fn state(&self) -> &ComponentState {
&self.state
}
fn set_state(&mut self, new_state: ComponentState) {
info!(
?new_state,
name = <Self as Component<REv>>::name(self),
"component state changed"
);
self.state = new_state;
}
}
type Transport = SslStream<TcpStream>;
pub(crate) type FullTransport<P> = tokio_serde::Framed<
FramedTransport,
Message<P>,
Arc<Message<P>>,
CountingFormat<BincodeFormat>,
>;
pub(crate) type FramedTransport = tokio_util::codec::Framed<Transport, LengthDelimitedCodec>;
fn full_transport<P>(
metrics: Weak<Metrics>,
connection_id: ConnectionId,
framed: FramedTransport,
role: Role,
) -> FullTransport<P>
where
for<'de> P: Serialize + Deserialize<'de>,
for<'de> Message<P>: Serialize + Deserialize<'de>,
{
tokio_serde::Framed::new(
framed,
CountingFormat::new(metrics, connection_id, role, BincodeFormat::default()),
)
}
fn framed_transport(transport: Transport, maximum_net_message_size: u32) -> FramedTransport {
tokio_util::codec::Framed::new(
transport,
LengthDelimitedCodec::builder()
.max_frame_length(maximum_net_message_size as usize)
.new_codec(),
)
}
impl<R, P> Debug for Network<R, P>
where
P: Payload,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Network")
.field("our_id", &self.context.our_id())
.field("state", &self.state)
.field("public_addr", &self.context.public_addr())
.finish()
}
}
#[cfg(test)]
mod gossip_target_tests {
use std::{collections::BTreeSet, iter};
use static_assertions::const_assert;
use casper_types::testing::TestRng;
use super::*;
const VALIDATOR_COUNT: usize = 10;
const NON_VALIDATOR_COUNT: usize = 20;
const_assert!(VALIDATOR_COUNT < NON_VALIDATOR_COUNT);
struct Fixture {
validators: BTreeSet<NodeId>,
non_validators: BTreeSet<NodeId>,
all_peers: Vec<NodeId>,
}
impl Fixture {
fn new(rng: &mut TestRng) -> Self {
let validators: BTreeSet<NodeId> = iter::repeat_with(|| NodeId::random(rng))
.take(VALIDATOR_COUNT)
.collect();
let non_validators: BTreeSet<NodeId> = iter::repeat_with(|| NodeId::random(rng))
.take(NON_VALIDATOR_COUNT)
.collect();
let mut all_peers: Vec<NodeId> = validators
.iter()
.copied()
.chain(non_validators.iter().copied())
.collect();
all_peers.shuffle(rng);
Fixture {
validators,
non_validators,
all_peers,
}
}
fn is_validator_in_era(&self) -> impl Fn(EraId, &NodeId) -> bool + '_ {
move |_era_id: EraId, node_id: &NodeId| self.validators.contains(node_id)
}
fn num_validators<'a>(&self, input: impl Iterator<Item = &'a NodeId>) -> usize {
input
.filter(move |&node_id| self.validators.contains(node_id))
.count()
}
fn num_non_validators<'a>(&self, input: impl Iterator<Item = &'a NodeId>) -> usize {
input
.filter(move |&node_id| self.non_validators.contains(node_id))
.count()
}
}
#[test]
fn should_choose_mixed() {
const TARGET: GossipTarget = GossipTarget::Mixed(EraId::new(1));
let mut rng = TestRng::new();
let fixture = Fixture::new(&mut rng);
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT + NON_VALIDATOR_COUNT + 1,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), fixture.all_peers.len());
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT + NON_VALIDATOR_COUNT,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), fixture.all_peers.len());
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
2 * VALIDATOR_COUNT,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), 2 * VALIDATOR_COUNT);
assert_eq!(fixture.num_validators(chosen.iter()), VALIDATOR_COUNT);
assert_eq!(fixture.num_non_validators(chosen.iter()), VALIDATOR_COUNT);
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), VALIDATOR_COUNT);
assert_eq!(fixture.num_validators(chosen.iter()), VALIDATOR_COUNT / 2);
assert_eq!(
fixture.num_non_validators(chosen.iter()),
VALIDATOR_COUNT / 2
);
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
2,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), 2);
assert_eq!(fixture.num_validators(chosen.iter()), 1);
assert_eq!(fixture.num_non_validators(chosen.iter()), 1);
let mut got_validator = false;
let mut got_non_validator = false;
let mut attempts = 0;
while !got_validator || !got_non_validator {
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
1,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), 1);
let node_id = chosen.iter().next().unwrap();
got_validator |= fixture.validators.contains(node_id);
got_non_validator |= fixture.non_validators.contains(node_id);
attempts += 1;
assert!(attempts < 1_000_000);
}
let exclude: HashSet<_> = fixture
.validators
.iter()
.copied()
.take(VALIDATOR_COUNT - 1)
.collect();
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT,
&exclude,
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), VALIDATOR_COUNT);
assert_eq!(fixture.num_validators(chosen.iter()), 1);
assert_eq!(
fixture.num_non_validators(chosen.iter()),
VALIDATOR_COUNT - 1
);
assert!(exclude.is_disjoint(&chosen));
let exclude: HashSet<_> = fixture.non_validators.iter().copied().collect();
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
3,
&exclude,
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), 3);
assert_eq!(fixture.num_validators(chosen.iter()), 3);
assert!(exclude.is_disjoint(&chosen));
}
#[test]
fn should_choose_all() {
const TARGET: GossipTarget = GossipTarget::All;
let mut rng = TestRng::new();
let fixture = Fixture::new(&mut rng);
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT + NON_VALIDATOR_COUNT + 1,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), fixture.all_peers.len());
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT + NON_VALIDATOR_COUNT,
&HashSet::new(),
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), fixture.all_peers.len());
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT,
&HashSet::new(),
fixture.validators.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), VALIDATOR_COUNT);
assert_eq!(fixture.num_validators(chosen.iter()), VALIDATOR_COUNT);
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT,
&HashSet::new(),
fixture.non_validators.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), VALIDATOR_COUNT);
assert_eq!(fixture.num_non_validators(chosen.iter()), VALIDATOR_COUNT);
let exclude: HashSet<_> = fixture
.all_peers
.iter()
.copied()
.take(NON_VALIDATOR_COUNT)
.collect();
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
VALIDATOR_COUNT,
&exclude,
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), VALIDATOR_COUNT);
assert!(exclude.is_disjoint(&chosen));
let exclude: HashSet<_> = fixture
.non_validators
.iter()
.copied()
.take(NON_VALIDATOR_COUNT - VALIDATOR_COUNT)
.collect();
let mut got_validator = false;
let mut got_non_validator = false;
let mut attempts = 0;
while !got_validator || !got_non_validator {
let chosen = choose_gossip_peers(
&mut rng,
TARGET,
1,
&exclude,
fixture.all_peers.iter().copied(),
fixture.is_validator_in_era(),
);
assert_eq!(chosen.len(), 1);
assert!(exclude.is_disjoint(&chosen));
let node_id = chosen.iter().next().unwrap();
got_validator |= fixture.validators.contains(node_id);
got_non_validator |= fixture.non_validators.contains(node_id);
attempts += 1;
assert!(attempts < 1_000_000);
}
}
}