use std::{
collections::{hash_map::Entry, HashMap},
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use fake_instant::FakeClock as Instant;
use futures::future::{BoxFuture, FutureExt};
use serde::Serialize;
use tokio::time::{self, error::Elapsed};
use tracing::{debug, error_span};
use tracing_futures::Instrument;
use casper_types::testing::TestRng;
use casper_types::{Chainspec, ChainspecRawBytes};
use super::ConditionCheckReactor;
use crate::{
components::ComponentState,
effect::{EffectBuilder, Effects},
reactor::{Finalize, Reactor, Runner, TryCrankOutcome},
tls::KeyFingerprint,
types::{ExitCode, NodeId},
utils::Loadable,
NodeRng,
};
pub(crate) type Nodes<R> = HashMap<NodeId, Runner<ConditionCheckReactor<R>>>;
pub(crate) trait NetworkedReactor: Sized {
fn node_id(&self) -> NodeId {
#[allow(trivial_casts)]
let addr = self as *const _ as usize;
let mut raw: [u8; KeyFingerprint::LENGTH] = [0; KeyFingerprint::LENGTH];
raw[0..(size_of::<usize>())].copy_from_slice(&addr.to_be_bytes());
NodeId::from(KeyFingerprint::from(raw))
}
}
const POLL_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Default)]
pub(crate) struct TestingNetwork<R: Reactor + NetworkedReactor> {
nodes: HashMap<NodeId, Runner<ConditionCheckReactor<R>>>,
}
impl<R> TestingNetwork<R>
where
R: Reactor + NetworkedReactor,
R::Config: Default,
<R as Reactor>::Error: Debug,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
pub(crate) async fn add_node<'a, 'b: 'a>(
&'a mut self,
rng: &'b mut TestRng,
) -> Result<(NodeId, &'a mut Runner<ConditionCheckReactor<R>>), R::Error> {
self.add_node_with_config(Default::default(), rng).await
}
pub(crate) async fn add_nodes(&mut self, rng: &mut TestRng, count: usize) -> Vec<NodeId> {
let mut node_ids = vec![];
for _ in 0..count {
let (node_id, _runner) = self.add_node(rng).await.unwrap();
node_ids.push(node_id);
}
node_ids
}
}
impl<R> TestingNetwork<R>
where
R: Reactor + NetworkedReactor,
R::Event: Serialize,
R::Error: From<prometheus::Error> + From<R::Error>,
{
pub(crate) fn new() -> Self {
TestingNetwork {
nodes: HashMap::new(),
}
}
pub(crate) async fn add_node_with_config<'a, 'b: 'a>(
&'a mut self,
cfg: R::Config,
rng: &'b mut NodeRng,
) -> Result<(NodeId, &'a mut Runner<ConditionCheckReactor<R>>), R::Error> {
let (chainspec, chainspec_raw_bytes) =
<(Chainspec, ChainspecRawBytes)>::from_resources("local");
self.add_node_with_config_and_chainspec(
cfg,
Arc::new(chainspec),
Arc::new(chainspec_raw_bytes),
rng,
)
.await
}
pub(crate) async fn add_node_with_config_and_chainspec<'a, 'b: 'a>(
&'a mut self,
cfg: R::Config,
chainspec: Arc<Chainspec>,
chainspec_raw_bytes: Arc<ChainspecRawBytes>,
rng: &'b mut NodeRng,
) -> Result<(NodeId, &'a mut Runner<ConditionCheckReactor<R>>), R::Error> {
let runner: Runner<ConditionCheckReactor<R>> =
Runner::new(cfg, chainspec, chainspec_raw_bytes, rng).await?;
let node_id = runner.reactor().node_id();
let node_ref = match self.nodes.entry(node_id) {
Entry::Occupied(_) => {
panic!("trying to insert a duplicate node {}", node_id)
}
Entry::Vacant(entry) => entry.insert(runner),
};
Ok((node_id, node_ref))
}
pub(crate) fn remove_node(
&mut self,
node_id: &NodeId,
) -> Option<Runner<ConditionCheckReactor<R>>> {
self.nodes.remove(node_id)
}
pub(crate) async fn crank(&mut self, node_id: &NodeId, rng: &mut TestRng) -> TryCrankOutcome {
let runner = self.nodes.get_mut(node_id).expect("should find node");
let node_id = runner.reactor().node_id();
runner
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
}
pub(crate) async fn crank_until<F>(
&mut self,
node_id: &NodeId,
rng: &mut TestRng,
condition: F,
within: Duration,
) where
F: Fn(&R::Event) -> bool + Send + 'static,
{
self.nodes
.get_mut(node_id)
.unwrap()
.crank_until(rng, condition, within)
.await
}
async fn crank_all(&mut self, rng: &mut TestRng) -> usize {
let mut event_count = 0;
for node in self.nodes.values_mut() {
let node_id = node.reactor().node_id();
match node
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
{
TryCrankOutcome::NoEventsToProcess => (),
TryCrankOutcome::ProcessedAnEvent => event_count += 1,
TryCrankOutcome::ShouldExit(exit_code) => {
panic!("should not exit: {:?}", exit_code)
}
TryCrankOutcome::Exited => unreachable!(),
}
}
event_count
}
pub(crate) async fn crank_all_until<F>(
&mut self,
node_id: &NodeId,
rng: &mut TestRng,
condition: F,
within: Duration,
) where
F: Fn(&R::Event) -> bool + Send + 'static,
{
self.nodes
.get_mut(node_id)
.unwrap()
.reactor_mut()
.set_condition_checker(Box::new(condition));
time::timeout(within, self.crank_and_check_all_indefinitely(node_id, rng))
.await
.unwrap()
}
async fn crank_and_check_all_indefinitely(
&mut self,
node_to_check: &NodeId,
rng: &mut TestRng,
) {
loop {
let mut no_events = true;
for node in self.nodes.values_mut() {
let node_id = node.reactor().node_id();
match node
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
{
TryCrankOutcome::NoEventsToProcess => (),
TryCrankOutcome::ProcessedAnEvent => {
no_events = false;
}
TryCrankOutcome::ShouldExit(exit_code) => {
panic!("should not exit: {:?}", exit_code)
}
TryCrankOutcome::Exited => unreachable!(),
}
if node_id == *node_to_check && node.reactor().condition_result() {
debug!("{} met condition", node_to_check);
return;
}
}
if no_events {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::sleep(POLL_INTERVAL).await;
continue;
}
}
}
pub(crate) async fn settle(
&mut self,
rng: &mut TestRng,
quiet_for: Duration,
within: Duration,
) {
time::timeout(within, self.settle_indefinitely(rng, quiet_for))
.await
.unwrap_or_else(|_| {
panic!(
"network did not settle for {:?} within {:?}",
quiet_for, within
)
})
}
async fn settle_indefinitely(&mut self, rng: &mut TestRng, quiet_for: Duration) {
let mut no_events = false;
loop {
if self.crank_all(rng).await == 0 {
if no_events {
debug!("network has been quiet for {:?}", quiet_for);
break;
} else {
no_events = true;
Instant::advance_time(quiet_for.as_millis() as u64);
time::sleep(quiet_for).await;
}
} else {
no_events = false;
}
}
}
pub(crate) async fn try_settle_on<F>(
&mut self,
rng: &mut TestRng,
condition: F,
within: Duration,
) -> Result<(), Elapsed>
where
F: Fn(&Nodes<R>) -> bool,
{
time::timeout(within, self.settle_on_indefinitely(rng, condition)).await
}
pub(crate) async fn settle_on<F>(&mut self, rng: &mut TestRng, condition: F, within: Duration)
where
F: Fn(&Nodes<R>) -> bool,
{
self.try_settle_on(rng, condition, within)
.await
.unwrap_or_else(|_| {
panic!(
"network did not settle on condition within {} seconds",
within.as_secs_f64()
)
})
}
async fn settle_on_indefinitely<F>(&mut self, rng: &mut TestRng, condition: F)
where
F: Fn(&Nodes<R>) -> bool,
{
loop {
if condition(&self.nodes) {
debug!("network settled on meeting condition");
break;
}
if self.crank_all(rng).await == 0 {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::sleep(POLL_INTERVAL).await;
}
}
}
pub(crate) async fn settle_on_exit(
&mut self,
rng: &mut TestRng,
expected: ExitCode,
within: Duration,
) {
time::timeout(within, self.settle_on_exit_indefinitely(rng, expected))
.await
.unwrap_or_else(|_| panic!("network did not settle on condition within {:?}", within))
}
pub(crate) async fn settle_on_node_exit(
&mut self,
rng: &mut TestRng,
node_id: &NodeId,
expected: ExitCode,
within: Duration,
) {
time::timeout(
within,
self.settle_on_node_exit_indefinitely(rng, node_id, expected),
)
.await
.unwrap_or_else(|elapsed| {
panic!(
"network did not settle on condition within {within:?}, time elapsed: {elapsed:?}",
)
})
}
pub(crate) async fn _settle_on_component_state(
&mut self,
rng: &mut TestRng,
name: &str,
state: &ComponentState,
timeout: Duration,
) {
self.settle_on(
rng,
|net| {
net.values()
.all(|runner| match runner.reactor().get_component_state(name) {
Some(actual_state) => actual_state == state,
None => panic!("unknown or unsupported component: {}", name),
})
},
timeout,
)
.await;
}
pub(crate) fn crank_until_stopped(
mut self,
mut rng: TestRng,
) -> impl futures::Future<Output = (Self, TestRng)>
where
R: Send + 'static,
{
let stop = Arc::new(AtomicBool::new(false));
let handle = tokio::spawn({
let stop = stop.clone();
async move {
while !stop.load(Ordering::Relaxed) {
if self.crank_all(&mut rng).await == 0 {
time::sleep(POLL_INTERVAL).await;
};
}
(self, rng)
}
});
async move {
stop.store(true, Ordering::Relaxed);
handle.await.expect("failed to join background crank")
}
}
async fn settle_on_exit_indefinitely(&mut self, rng: &mut TestRng, expected: ExitCode) {
let mut exited_as_expected = 0;
loop {
if exited_as_expected == self.nodes.len() {
debug!(?expected, "all nodes exited with expected code");
break;
}
let mut event_count = 0;
for node in self.nodes.values_mut() {
let node_id = node.reactor().node_id();
match node
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
{
TryCrankOutcome::NoEventsToProcess => (),
TryCrankOutcome::ProcessedAnEvent => event_count += 1,
TryCrankOutcome::ShouldExit(exit_code) if exit_code == expected => {
exited_as_expected += 1;
event_count += 1;
}
TryCrankOutcome::ShouldExit(exit_code) => {
panic!(
"unexpected exit: expected {:?}, got {:?}",
expected, exit_code
)
}
TryCrankOutcome::Exited => (),
}
}
if event_count == 0 {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::sleep(POLL_INTERVAL).await;
}
}
}
async fn settle_on_node_exit_indefinitely(
&mut self,
rng: &mut TestRng,
node_id: &NodeId,
expected: ExitCode,
) {
'outer: loop {
let mut event_count = 0;
for node in self.nodes.values_mut() {
let current_node_id = node.reactor().node_id();
match node
.try_crank(rng)
.instrument(error_span!("crank", node_id = %node_id))
.await
{
TryCrankOutcome::NoEventsToProcess => (),
TryCrankOutcome::ProcessedAnEvent => event_count += 1,
TryCrankOutcome::ShouldExit(exit_code)
if (exit_code == expected && current_node_id == *node_id) =>
{
debug!(?expected, ?node_id, "node exited with expected code");
break 'outer;
}
TryCrankOutcome::ShouldExit(exit_code) => {
panic!(
"unexpected exit: expected {expected:?} for node {node_id:?}, got {exit_code:?} for node {current_node_id:?}",
)
}
TryCrankOutcome::Exited => (),
}
}
if event_count == 0 {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::sleep(POLL_INTERVAL).await;
}
}
}
pub(crate) fn nodes(&self) -> &HashMap<NodeId, Runner<ConditionCheckReactor<R>>> {
&self.nodes
}
pub(crate) fn nodes_mut(&mut self) -> &mut HashMap<NodeId, Runner<ConditionCheckReactor<R>>> {
&mut self.nodes
}
pub(crate) fn runners_mut(
&mut self,
) -> impl Iterator<Item = &mut Runner<ConditionCheckReactor<R>>> {
self.nodes.values_mut()
}
pub(crate) fn reactors_mut(&mut self) -> impl Iterator<Item = &mut R> {
self.runners_mut()
.map(|runner| runner.reactor_mut().inner_mut())
}
pub(crate) async fn process_injected_effect_on<F>(
&mut self,
node_id: &NodeId,
create_effects: F,
) where
F: FnOnce(EffectBuilder<R::Event>) -> Effects<R::Event>,
{
let runner = self.nodes.get_mut(node_id).unwrap();
let node_id = runner.reactor().node_id();
runner
.process_injected_effects(create_effects)
.instrument(error_span!("inject", node_id = %node_id))
.await
}
}
impl<R> Finalize for TestingNetwork<R>
where
R: Finalize + NetworkedReactor + Reactor + Send + 'static,
R::Event: Serialize + Send + Sync,
R::Error: From<prometheus::Error>,
{
fn finalize(self) -> BoxFuture<'static, ()> {
async move {
for (_, node) in self.nodes.into_iter() {
node.drain_into_inner().await.finalize().await;
}
debug!("network finalized");
}
.boxed()
}
}