use super::*;
const B1_SYNC_TIME: std::time::Duration = std::time::Duration::from_secs(120);
struct Loc1PubData {
pub_time: std::time::Instant,
author: usize,
found_shallow: std::collections::HashSet<usize>,
found_full: std::collections::HashSet<usize>,
}
pub struct LocalBehavior1 {
runner: Option<HcStressTestRunner<Self>>,
start_at: std::time::Instant,
large_publish_count: usize,
small_publish_count: usize,
shallow_found_in_time: usize,
shallow_found_later: usize,
full_found_in_time: usize,
full_found_later: usize,
shallow_validate_nodes: std::collections::HashMap<usize, std::time::Instant>,
full_validate_nodes: std::collections::HashMap<usize, std::time::Instant>,
pub_data: std::collections::HashMap<ActionHash, Loc1PubData>,
}
impl std::fmt::Debug for LocalBehavior1 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalBehavior1")
.field("runtime", &self.start_at.elapsed())
.field("large_publish_count", &self.large_publish_count)
.field("small_publish_count", &self.small_publish_count)
.field("shallow_found_in_time", &self.shallow_found_in_time)
.field("shallow_found_later", &self.shallow_found_later)
.field("full_found_in_time", &self.full_found_in_time)
.field("full_found_later", &self.full_found_later)
.finish()
}
}
impl Report for LocalBehavior1 {
fn spawn(&mut self, _node_id: usize) {}
fn shutdown(&mut self, node_id: usize, _runtime: std::time::Duration) {
self.validate();
self.shallow_validate_nodes.remove(&node_id);
self.full_validate_nodes.remove(&node_id);
}
fn publish(
&mut self,
node_id: usize,
_runtime: std::time::Duration,
byte_count: usize,
hash: ActionHash,
) {
if byte_count > 1024 {
self.large_publish_count += 1;
} else {
self.small_publish_count += 1;
}
self.pub_data.insert(
hash,
Loc1PubData {
pub_time: std::time::Instant::now(),
author: node_id,
found_shallow: std::collections::HashSet::new(),
found_full: std::collections::HashSet::new(),
},
);
}
fn fetch_shallow(
&mut self,
node_id: usize,
_runtime: std::time::Duration,
hash_list: Vec<ActionHash>,
) {
for hash in hash_list {
if let Some(pub_data) = self.pub_data.get_mut(&hash) {
if node_id != pub_data.author && pub_data.found_shallow.insert(node_id) {
if pub_data.pub_time.elapsed() > B1_SYNC_TIME {
self.shallow_found_later += 1;
} else {
self.shallow_found_in_time += 1;
}
}
}
}
}
fn fetch_full(&mut self, node_id: usize, _runtime: std::time::Duration, hash: ActionHash) {
if let Some(pub_data) = self.pub_data.get_mut(&hash) {
if node_id != pub_data.author && pub_data.found_full.insert(node_id) {
if pub_data.pub_time.elapsed() > B1_SYNC_TIME {
self.full_found_later += 1;
} else {
self.full_found_in_time += 1;
}
}
}
}
}
impl LocalBehavior1 {
pub fn new() -> Arc<Mutex<Self>> {
let this = Arc::new(Mutex::new(Self {
runner: None,
start_at: std::time::Instant::now(),
large_publish_count: 0,
small_publish_count: 0,
shallow_found_in_time: 0,
shallow_found_later: 0,
full_found_in_time: 0,
full_found_later: 0,
shallow_validate_nodes: std::collections::HashMap::new(),
full_validate_nodes: std::collections::HashMap::new(),
pub_data: std::collections::HashMap::new(),
}));
let runner = HcStressTestRunner::new(this.clone());
this.lock().unwrap().runner = Some(runner);
{
let this = this.clone();
tokio::task::spawn(async move {
let network_seed = random_network_seed();
let rendezvous = SweetLocalRendezvous::new().await;
println!("spawn 1 null node that shuts down after ~30 s");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
SHUTDOWN_30_S,
vec![(0, BehaviorPublish::None)],
vec![(0, BehaviorQuery::None)],
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("spawn 1 full query large publisher, pub every ~5 m, query every ~15 s");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
let node_id = this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
BehaviorLifetime::Forever,
vec![(0, PUBLISH_LARGE_5_M)],
vec![(0, QUERY_FULL_15_S)],
);
this.lock()
.unwrap()
.shallow_validate_nodes
.insert(node_id, std::time::Instant::now());
this.lock()
.unwrap()
.full_validate_nodes
.insert(node_id, std::time::Instant::now());
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("spawn 1 full query small publisher, pub every ~1 m, query every ~15 s");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
let node_id = this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
BehaviorLifetime::Forever,
vec![(0, PUBLISH_SMALL_1_M)],
vec![(0, QUERY_FULL_15_S)],
);
this.lock()
.unwrap()
.shallow_validate_nodes
.insert(node_id, std::time::Instant::now());
this.lock()
.unwrap()
.full_validate_nodes
.insert(node_id, std::time::Instant::now());
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!(
"spawn 1 shallow query small publisher, pub every ~1 m, query every ~15 s"
);
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
let node_id = this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
BehaviorLifetime::Forever,
vec![(0, PUBLISH_SMALL_1_M)],
vec![(0, QUERY_SHALLOW_15_S)],
);
this.lock()
.unwrap()
.shallow_validate_nodes
.insert(node_id, std::time::Instant::now());
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("spawn 1 small single publisher that shuts down after ~3 m");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
SHUTDOWN_3_M,
vec![(0, PUBLISH_SMALL_SINGLE)],
vec![(0, BehaviorQuery::None)],
);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("spawn 1 large single publisher that shuts down after ~3 m");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
SHUTDOWN_3_M,
vec![(0, PUBLISH_LARGE_SINGLE)],
vec![(0, BehaviorQuery::None)],
);
loop {
for _ in 0..6 {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
this.lock().unwrap().validate();
}
println!("spawn shallow query only node that shuts down after ~3 m, query every ~15 s");
let node = loc_test_conductor(network_seed.clone(), rendezvous.clone()).await;
let node_id = this.lock().unwrap().runner.as_ref().unwrap().add_node(
node,
SHUTDOWN_3_M,
vec![(0, BehaviorPublish::None)],
vec![(0, QUERY_SHALLOW_15_S)],
);
this.lock()
.unwrap()
.shallow_validate_nodes
.insert(node_id, std::time::Instant::now());
}
});
}
this
}
fn validate(&mut self) {
for (_, pub_data) in self.pub_data.iter() {
if pub_data.pub_time.elapsed() > B1_SYNC_TIME
&& (pub_data.found_shallow.is_empty() || pub_data.found_full.is_empty())
{
panic!("published item was not queried by anyone in {B1_SYNC_TIME:?}!");
}
}
for (node_id, online) in self.shallow_validate_nodes.iter() {
if online.elapsed() < B1_SYNC_TIME {
continue;
}
for (hash, pub_data) in self.pub_data.iter() {
if pub_data.pub_time.elapsed() > B1_SYNC_TIME
&& pub_data.author != *node_id
&& !pub_data.found_shallow.contains(node_id)
{
panic!(
"node {node_id} could not shallow discover {hash} within {B1_SYNC_TIME:?}"
);
}
}
}
for (node_id, online) in self.full_validate_nodes.iter() {
if online.elapsed() < B1_SYNC_TIME {
continue;
}
for (hash, pub_data) in self.pub_data.iter() {
if pub_data.pub_time.elapsed() > B1_SYNC_TIME
&& pub_data.author != *node_id
&& !pub_data.found_full.contains(node_id)
{
panic!("node {node_id} could not full get {hash} within {B1_SYNC_TIME:?}");
}
}
}
}
}
async fn loc_test_conductor(network_seed: String, rendezvous: DynSweetRendezvous) -> HcStressTest {
let config = SweetConductorConfig::rendezvous(true);
let conductor = SweetConductor::from_config_rendezvous(config, rendezvous).await;
let dna = HcStressTest::test_dna(network_seed).await;
HcStressTest::new(conductor, &[dna]).await
}