use std::net::SocketAddr;
use futures::FutureExt;
use proptest::prelude::*;
use tower::{discover::Discover, BoxError, ServiceExt};
use zebra_chain::{
block, chain_tip::ChainTip, parameters::Network, serialization::ZcashDeserializeInto,
};
use crate::{
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion, ReceiveRequestAttempt},
peer_set::PeerSet,
protocol::external::types::Version,
PeerSocketAddr, Request,
};
use super::{BlockHeightPairAcrossNetworkUpgrades, PeerSetBuilder, PeerVersions};
proptest! {
#[test]
fn only_non_outdated_peers_are_accepted(
network in any::<Network>(),
block_height in any::<block::Height>(),
peer_versions in any::<PeerVersions>(),
) {
let (runtime, _init_guard) = zebra_test::init_async();
let (mut minimum_peer_version, best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&network);
best_tip_height.send_best_tip_height(block_height);
let current_minimum_version = minimum_peer_version.current();
runtime.block_on(async move {
let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery();
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version)
.max_conns_per_ip(usize::MAX)
.build();
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut harnesses,
current_minimum_version,
)?;
Ok::<_, TestCaseError>(())
})?;
}
#[test]
fn outdated_peers_are_dropped_on_network_upgrade(
block_heights in any::<BlockHeightPairAcrossNetworkUpgrades>(),
peer_versions in any::<PeerVersions>(),
) {
let (runtime, _init_guard) = zebra_test::init_async();
let (mut minimum_peer_version, best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&block_heights.network);
best_tip_height.send_best_tip_height(block_heights.before_upgrade);
runtime.block_on(async move {
let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery();
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut harnesses,
minimum_peer_version.current(),
)?;
best_tip_height.send_best_tip_height(block_heights.after_upgrade);
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut harnesses,
minimum_peer_version.current(),
)?;
Ok::<_, TestCaseError>(())
})?;
}
#[test]
fn broadcast_to_peers(
total_number_of_peers in (1..100usize)
) {
let block: block::Block = zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap();
let block_hash = block::Hash::from(&block);
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let peer_versions = vec![CURRENT_NETWORK_PROTOCOL_VERSION; total_number_of_peers];
let peer_versions = PeerVersions {
peer_versions,
};
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();
let total_number_of_active_peers = check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
CURRENT_NETWORK_PROTOCOL_VERSION,
)?;
prop_assert_eq!(total_number_of_peers, total_number_of_active_peers);
let number_of_peers_to_broadcast = peer_set.number_of_peers_to_broadcast();
prop_assert!(number_of_peers_to_broadcast >= 1);
if total_number_of_active_peers > 1 {
prop_assert!(number_of_peers_to_broadcast < total_number_of_active_peers);
}
let response_future = peer_set.route_broadcast(Request::AdvertiseBlock(block_hash));
std::mem::drop(response_future);
let mut received = 0;
for mut h in handles {
if let ReceiveRequestAttempt::Request(client_request) = h.try_to_receive_outbound_client_request() {
prop_assert_eq!(client_request.request, Request::AdvertiseBlock(block_hash));
received += 1;
};
}
prop_assert_eq!(received, number_of_peers_to_broadcast);
Ok::<_, TestCaseError>(())
})?;
}
#[test]
fn peerset_always_broadcasts(
total_number_of_peers in (2..10usize)
) {
let block: block::Block = zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap();
let block_hash = block::Hash::from(&block);
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let peer_versions = vec![CURRENT_NETWORK_PROTOCOL_VERSION; total_number_of_peers];
let peer_versions = PeerVersions {
peer_versions,
};
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();
for port in 1u16..total_number_of_peers as u16 {
peer_set.remove(&SocketAddr::new([127, 0, 0, 1].into(), port).into());
handles.remove(0);
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
CURRENT_NETWORK_PROTOCOL_VERSION,
)?;
let number_of_peers_to_broadcast = peer_set.number_of_peers_to_broadcast();
let response_future = peer_set.route_broadcast(Request::AdvertiseBlock(block_hash));
std::mem::drop(response_future);
let mut received = 0;
for h in &mut handles {
if let ReceiveRequestAttempt::Request(client_request) = h.try_to_receive_outbound_client_request() {
prop_assert_eq!(client_request.request, Request::AdvertiseBlock(block_hash));
received += 1;
};
}
prop_assert_eq!(received, number_of_peers_to_broadcast);
}
Ok::<_, TestCaseError>(())
})?;
}
#[test]
#[should_panic(expected = "requests must be routed to at least one peer")]
fn panics_when_broadcasting_to_no_peers(
total_number_of_peers in (2..10usize)
) {
let block: block::Block = zebra_test::vectors::BLOCK_MAINNET_10_BYTES
.zcash_deserialize_into()
.unwrap();
let block_hash = block::Hash::from(&block);
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let peer_versions = vec![CURRENT_NETWORK_PROTOCOL_VERSION; total_number_of_peers];
let peer_versions = PeerVersions {
peer_versions,
};
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(usize::MAX)
.build();
for port in 1u16..=total_number_of_peers as u16 {
peer_set.remove(&SocketAddr::new([127, 0, 0, 1].into(), port).into());
handles.remove(0);
}
let response_future = peer_set.route_broadcast(Request::AdvertiseBlock(block_hash));
std::mem::drop(response_future);
Ok::<_, TestCaseError>(())
})?;
}
}
fn check_if_only_up_to_date_peers_are_live<D, C>(
peer_set: &mut PeerSet<D, C>,
harnesses: &mut Vec<ClientTestHarness>,
minimum_version: Version,
) -> Result<usize, TestCaseError>
where
D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
D::Error: Into<BoxError>,
C: ChainTip,
{
let poll_result = peer_set.ready().now_or_never();
let all_peers_are_outdated = harnesses
.iter()
.all(|harness| harness.remote_version() < minimum_version);
if all_peers_are_outdated {
prop_assert!(poll_result.is_none());
} else {
prop_assert!(matches!(poll_result, Some(Ok(_))));
}
let mut number_of_connected_peers = 0;
for harness in harnesses {
let is_outdated = harness.remote_version() < minimum_version;
let is_connected = harness.wants_connection_heartbeats();
prop_assert!(
is_connected != is_outdated,
"is_connected: {}, is_outdated: {}",
is_connected,
is_outdated,
);
if is_connected {
number_of_connected_peers += 1;
}
}
Ok(number_of_connected_peers)
}