use std::{
collections::{hash_map::Entry, HashMap},
fmt::{Debug, Display},
hash::Hash,
time::Duration,
};
use fake_instant::FakeClock as Instant;
use futures::future::{BoxFuture, FutureExt};
use serde::Serialize;
use tokio::time;
use tracing::{debug, error_span};
use tracing_futures::Instrument;
use super::ConditionCheckReactor;
use crate::{
effect::{EffectBuilder, Effects},
reactor::{Finalize, Reactor, Runner},
testing::TestRng,
NodeRng,
};
pub type Nodes<R> = HashMap<<R as NetworkedReactor>::NodeId, Runner<ConditionCheckReactor<R>>>;
pub trait NetworkedReactor: Sized {
type NodeId: Eq + Hash + Clone + Display + Debug;
fn node_id(&self) -> Self::NodeId;
}
const POLL_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Default)]
pub struct Network<R: Reactor + NetworkedReactor> {
nodes: HashMap<<R as NetworkedReactor>::NodeId, Runner<ConditionCheckReactor<R>>>,
}
impl<R> Network<R>
where
R: Reactor + NetworkedReactor,
R::Config: Default,
<R as Reactor>::Error: Debug,
R::Event: Serialize,
R::Error: From<prometheus::Error>,
{
pub async fn add_node(
&mut self,
rng: &mut TestRng,
) -> Result<(R::NodeId, &mut Runner<ConditionCheckReactor<R>>), R::Error> {
self.add_node_with_config(Default::default(), rng).await
}
pub async fn add_nodes(&mut self, rng: &mut TestRng, count: usize) -> Vec<R::NodeId> {
let mut node_ids = vec![];
for _ in 0..count {
let (node_id, _runner) = self.add_node(rng).await.unwrap();
node_ids.push(node_id);
}
node_ids
}
}
impl<R> Network<R>
where
R: Reactor + NetworkedReactor,
R::Event: Serialize,
R::Error: From<prometheus::Error> + From<R::Error>,
{
pub fn new() -> Self {
Network {
nodes: HashMap::new(),
}
}
pub async fn add_node_with_config(
&mut self,
cfg: R::Config,
rng: &mut NodeRng,
) -> Result<(R::NodeId, &mut Runner<ConditionCheckReactor<R>>), R::Error> {
let runner: Runner<ConditionCheckReactor<R>> = Runner::new(cfg, rng).await?;
let node_id = runner.reactor().node_id();
let node_ref = match self.nodes.entry(node_id.clone()) {
Entry::Occupied(_) => {
panic!("trying to insert a duplicate node {}", node_id)
}
Entry::Vacant(entry) => entry.insert(runner),
};
Ok((node_id, node_ref))
}
pub fn remove_node(&mut self, node_id: &R::NodeId) -> Option<Runner<ConditionCheckReactor<R>>> {
self.nodes.remove(node_id)
}
pub async fn crank(&mut self, node_id: &R::NodeId, rng: &mut TestRng) -> usize {
let runner = self.nodes.get_mut(node_id).expect("should find node");
let node_id = runner.reactor().node_id();
let span = error_span!("crank", node_id = %node_id);
if runner.try_crank(rng).instrument(span).await.is_some() {
1
} else {
0
}
}
pub async fn crank_until<F>(
&mut self,
node_id: &R::NodeId,
rng: &mut TestRng,
condition: F,
within: Duration,
) where
F: Fn(&R::Event) -> bool + Send + 'static,
{
self.nodes
.get_mut(node_id)
.unwrap()
.reactor_mut()
.set_condition_checker(Box::new(condition));
time::timeout(within, self.crank_and_check_indefinitely(node_id, rng))
.await
.unwrap()
}
async fn crank_and_check_indefinitely(&mut self, node_id: &R::NodeId, rng: &mut TestRng) {
loop {
if self.crank(node_id, rng).await == 0 {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::delay_for(POLL_INTERVAL).await;
continue;
}
if self
.nodes
.get(node_id)
.unwrap()
.reactor()
.condition_result()
{
debug!("{} met condition", node_id);
return;
}
}
}
pub async fn crank_all(&mut self, rng: &mut TestRng) -> usize {
let mut event_count = 0;
for node in self.nodes.values_mut() {
let node_id = node.reactor().node_id();
let span = error_span!("crank", node_id = %node_id);
event_count += if node.try_crank(rng).instrument(span).await.is_some() {
1
} else {
0
}
}
event_count
}
pub async fn settle(&mut self, rng: &mut TestRng, quiet_for: Duration, within: Duration) {
time::timeout(within, self.settle_indefinitely(rng, quiet_for))
.await
.unwrap_or_else(|_| {
panic!(format!(
"network did not settle for {:?} within {:?}",
quiet_for, within
))
})
}
async fn settle_indefinitely(&mut self, rng: &mut TestRng, quiet_for: Duration) {
let mut no_events = false;
loop {
if self.crank_all(rng).await == 0 {
if no_events {
debug!("network has been quiet for {:?}", quiet_for);
break;
} else {
no_events = true;
Instant::advance_time(quiet_for.as_millis() as u64);
time::delay_for(quiet_for).await;
}
} else {
no_events = false;
}
}
}
pub async fn settle_on<F>(&mut self, rng: &mut TestRng, condition: F, within: Duration)
where
F: Fn(&Nodes<R>) -> bool,
{
time::timeout(within, self.settle_on_indefinitely(rng, condition))
.await
.unwrap_or_else(|_| {
panic!(format!(
"network did not settle on condition within {:?}",
within
))
})
}
async fn settle_on_indefinitely<F>(&mut self, rng: &mut TestRng, condition: F)
where
F: Fn(&Nodes<R>) -> bool,
{
loop {
if condition(&self.nodes) {
debug!("network settled on meeting condition");
break;
}
if self.crank_all(rng).await == 0 {
Instant::advance_time(POLL_INTERVAL.as_millis() as u64);
time::delay_for(POLL_INTERVAL).await;
}
}
}
pub fn nodes(&self) -> &HashMap<R::NodeId, Runner<ConditionCheckReactor<R>>> {
&self.nodes
}
pub async fn process_injected_effect_on<F>(&mut self, node_id: &R::NodeId, create_effects: F)
where
F: FnOnce(EffectBuilder<R::Event>) -> Effects<R::Event>,
{
let runner = self.nodes.get_mut(node_id).unwrap();
let node_id = runner.reactor().node_id();
let span = error_span!("inject", node_id = %node_id);
runner
.process_injected_effects(create_effects)
.instrument(span)
.await
}
}
impl<R> Finalize for Network<R>
where
R: Finalize + NetworkedReactor + Reactor + Send + 'static,
R::Event: Serialize,
R::NodeId: Send,
R::Error: From<prometheus::Error>,
{
fn finalize(self) -> BoxFuture<'static, ()> {
async move {
for (_, node) in self.nodes.into_iter() {
node.into_inner().finalize().await;
}
debug!("network finalized");
}
.boxed()
}
}