use std::{
collections::{HashMap, HashSet},
env,
fmt::{self, Debug, Display, Formatter},
time::{Duration, Instant},
};
use derive_more::From;
use pnet::datalink;
use prometheus::Registry;
use serde::Serialize;
use tracing::{debug, info};
use super::{Config, Event as NetworkEvent, Network as NetworkComponent, ENABLE_LIBP2P_ENV_VAR};
use crate::{
components::{chainspec_loader::Chainspec, Component},
effect::{
announcements::NetworkAnnouncement, requests::NetworkRequest, EffectBuilder, Effects,
},
protocol,
reactor::{self, EventQueueHandle, Finalize, Reactor, Runner},
testing::{
self, init_logging,
network::{Network, NetworkedReactor},
ConditionCheckReactor,
},
types::NodeId,
NodeRng,
};
#[derive(Debug, From, Serialize)]
enum Event {
#[from]
Network(#[serde(skip_serializing)] NetworkEvent<String>),
#[from]
NetworkRequest(#[serde(skip_serializing)] NetworkRequest<NodeId, String>),
#[from]
NetworkAnnouncement(#[serde(skip_serializing)] NetworkAnnouncement<NodeId, String>),
}
impl From<NetworkRequest<NodeId, protocol::Message>> for Event {
fn from(_request: NetworkRequest<NodeId, protocol::Message>) -> Self {
unreachable!()
}
}
impl Display for Event {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Debug::fmt(self, f)
}
}
#[derive(Debug)]
struct TestReactor {
network_component: NetworkComponent<Event, String>,
}
impl Reactor for TestReactor {
type Event = Event;
type Config = Config;
type Error = anyhow::Error;
fn new(
config: Self::Config,
_registry: &Registry,
event_queue: EventQueueHandle<Self::Event>,
rng: &mut NodeRng,
) -> anyhow::Result<(Self, Effects<Self::Event>)> {
let chainspec = Chainspec::random(rng);
let (network_component, effects) =
NetworkComponent::new(event_queue, config, &chainspec, false)?;
Ok((
TestReactor { network_component },
reactor::wrap_effects(Event::Network, effects),
))
}
fn dispatch_event(
&mut self,
effect_builder: EffectBuilder<Self::Event>,
rng: &mut NodeRng,
event: Self::Event,
) -> Effects<Self::Event> {
match event {
Event::Network(event) => reactor::wrap_effects(
Event::Network,
self.network_component
.handle_event(effect_builder, rng, event),
),
Event::NetworkRequest(request) => self.dispatch_event(
effect_builder,
rng,
Event::Network(NetworkEvent::from(request)),
),
Event::NetworkAnnouncement(NetworkAnnouncement::MessageReceived {
sender,
payload,
}) => {
todo!("{} -- {}", sender, payload);
}
Event::NetworkAnnouncement(NetworkAnnouncement::GossipOurAddress(
_gossiped_address,
)) => {
unreachable!();
}
Event::NetworkAnnouncement(NetworkAnnouncement::NewPeer(_)) => {
Effects::new()
}
}
}
}
impl NetworkedReactor for TestReactor {
type NodeId = NodeId;
fn node_id(&self) -> NodeId {
self.network_component.node_id()
}
}
impl Finalize for TestReactor {
fn finalize(self) -> futures::future::BoxFuture<'static, ()> {
self.network_component.finalize()
}
}
fn network_is_complete(
blocklist: &HashSet<NodeId>,
nodes: &HashMap<NodeId, Runner<ConditionCheckReactor<TestReactor>>>,
) -> bool {
if nodes.is_empty() {
return false;
}
if nodes.len() == 1 {
let nodes = &nodes.values().collect::<Vec<_>>();
let network_component = &nodes[0].reactor().inner().network_component;
if network_component.is_isolated() {
return true;
}
}
for (node_id, node) in nodes {
if blocklist.contains(node_id) {
continue;
}
if node.reactor().inner().network_component.peers.is_empty() {
return false;
}
}
true
}
fn network_started(net: &Network<TestReactor>) -> bool {
net.nodes()
.iter()
.all(|(_, runner)| !runner.reactor().inner().network_component.peers.is_empty())
}
#[tokio::test]
async fn run_two_node_network_five_times() {
if env::var(ENABLE_LIBP2P_ENV_VAR).is_err() {
return;
}
let mut rng = crate::new_rng();
let first_node_port = testing::unused_port_on_localhost() + 1;
init_logging();
for i in 0..5 {
info!("two-network test round {}", i);
let mut net = Network::new();
let start = Instant::now();
net.add_node_with_config(
Config::default_local_net_first_node(first_node_port),
&mut rng,
)
.await
.unwrap();
net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng)
.await
.unwrap();
let end = Instant::now();
debug!(
total_time_ms = (end - start).as_millis() as u64,
"finished setting up networking nodes"
);
let timeout = Duration::from_secs(20);
let blocklist = HashSet::new();
net.settle_on(
&mut rng,
|nodes| network_is_complete(&blocklist, nodes),
timeout,
)
.await;
assert!(
network_started(&net),
"each node is connected to at least one other node"
);
let quiet_for = Duration::from_millis(25);
let timeout = Duration::from_secs(2);
net.settle(&mut rng, quiet_for, timeout).await;
assert!(
network_is_complete(&blocklist, net.nodes()),
"network did not stay connected"
);
net.finalize().await;
}
}
#[tokio::test]
async fn bind_to_real_network_interface() {
if env::var(ENABLE_LIBP2P_ENV_VAR).is_err() {
return;
}
init_logging();
let mut rng = crate::new_rng();
let iface = datalink::interfaces()
.into_iter()
.find(|net| !net.ips.is_empty() && !net.ips.iter().any(|ip| ip.ip().is_loopback()))
.expect("could not find a single networking interface that isn't localhost");
let local_addr = iface
.ips
.into_iter()
.next()
.expect("found a interface with no ips")
.ip();
let port = testing::unused_port_on_localhost();
let local_net_config = Config::new((local_addr, port).into(), true);
let mut net = Network::<TestReactor>::new();
net.add_node_with_config(local_net_config, &mut rng)
.await
.unwrap();
let timeout = Duration::from_secs(2);
let blocklist = HashSet::new();
net.settle_on(
&mut rng,
|nodes| network_is_complete(&blocklist, nodes),
timeout,
)
.await;
net.finalize().await;
}
#[tokio::test]
async fn check_varying_size_network_connects() {
if env::var(ENABLE_LIBP2P_ENV_VAR).is_err() {
return;
}
init_logging();
let mut rng = crate::new_rng();
for &number_of_nodes in &[2u16, 3, 5, 9, 15] {
let timeout = Duration::from_secs(3 * number_of_nodes as u64);
let mut net = Network::new();
let first_node_port = testing::unused_port_on_localhost();
let _ = net
.add_node_with_config(
Config::default_local_net_first_node(first_node_port),
&mut rng,
)
.await
.unwrap();
for _ in 1..number_of_nodes {
net.add_node_with_config(Config::default_local_net(first_node_port), &mut rng)
.await
.unwrap();
}
let blocklist = HashSet::new();
net.settle_on(
&mut rng,
|nodes| network_is_complete(&blocklist, nodes),
timeout,
)
.await;
let blocklist = HashSet::new();
assert!(
network_is_complete(&blocklist, net.nodes()),
"network did not stay connected after being settled"
);
net.finalize().await;
}
}