use std::{cmp::max, collections::HashSet, iter, net::SocketAddr, sync::Arc, time::Duration};
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use zebra_chain::{
block,
parameters::{Network, NetworkUpgrade},
};
use crate::{
constants::DEFAULT_MAX_CONNS_PER_IP,
peer::{ClientRequest, MinimumPeerVersion},
peer_set::inventory_registry::InventoryStatus,
protocol::external::{types::Version, InventoryHash},
PeerSocketAddr, Request, SharedPeerError,
};
use indexmap::IndexMap;
use tokio::sync::watch;
use super::{PeerSetBuilder, PeerVersions};
#[test]
fn peer_set_ready_single_connection() {
let peer_versions = PeerVersions {
peer_versions: vec![Version::min_specified_for_upgrade(
&Network::Mainnet,
NetworkUpgrade::Nu6,
)],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
let mut client_handle = handles
.into_iter()
.next()
.expect("we always have at least one client");
assert!(client_handle
.try_to_receive_outbound_client_request()
.is_empty());
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
let peer_ready_future = peer_set.ready();
#[allow(unknown_lints)]
#[allow(clippy::drop_non_drop)]
std::mem::drop(peer_ready_future);
let peer_ready1 = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert!(client_handle
.try_to_receive_outbound_client_request()
.is_empty());
let fut = peer_ready1.call(Request::Peers);
assert!(matches!(
client_handle
.try_to_receive_outbound_client_request()
.request(),
Some(ClientRequest {
request: Request::Peers,
..
})
));
std::mem::drop(fut);
let peer_ready2 = peer_set
.ready()
.await
.expect("peer set service is always ready");
let _fut = peer_ready2.call(Request::MempoolTransactionIds);
assert!(matches!(
client_handle
.try_to_receive_outbound_client_request()
.request(),
Some(ClientRequest {
request: Request::MempoolTransactionIds,
..
})
));
});
}
#[test]
fn peer_set_ready_multiple_connections() {
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version, peer_version],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), 3);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(max(3, DEFAULT_MAX_CONNS_PER_IP))
.build();
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(peer_ready.ready_services.len(), 3);
handles[0].stop_connection_task().await;
handles[1].stop_connection_task().await;
peer_set
.ready()
.await
.expect("peer set service is always ready");
handles[2].stop_connection_task().await;
let peer_ready = peer_set.ready();
assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err());
});
}
#[test]
fn peer_set_rejects_connections_past_per_ip_limit() {
const NUM_PEER_VERSIONS: usize = crate::constants::DEFAULT_MAX_CONNS_PER_IP + 1;
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: [peer_version; NUM_PEER_VERSIONS].into_iter().collect(),
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), NUM_PEER_VERSIONS);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(
peer_ready.ready_services.len(),
crate::constants::DEFAULT_MAX_CONNS_PER_IP
);
});
}
#[test]
fn peer_set_route_inv_empty_registry() {
let test_hash = block::Hash([0; 32]);
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(max(2, DEFAULT_MAX_CONNS_PER_IP))
.build();
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(peer_ready.ready_services.len(), 2);
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
let mut received_count = 0;
for mut handle in handles {
if let Some(ClientRequest { request, .. }) =
handle.try_to_receive_outbound_client_request().request()
{
assert_eq!(sent_request, request);
received_count += 1;
}
}
assert_eq!(received_count, 1);
});
}
#[test]
fn broadcast_all_queued_removes_banned_peers() {
let peer_versions = PeerVersions {
peer_versions: vec![Version::min_specified_for_upgrade(
&Network::Mainnet,
NetworkUpgrade::Nu6,
)],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let (discovered_peers, _handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
let banned_ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
let mut bans_map: IndexMap<std::net::IpAddr, std::time::Instant> = IndexMap::new();
bans_map.insert(banned_ip, std::time::Instant::now());
let (bans_tx, bans_rx) = watch::channel(Arc::new(bans_map));
let _ = bans_tx;
peer_set.bans_receiver = bans_rx;
let banned_addr: PeerSocketAddr = SocketAddr::new(banned_ip, 1).into();
let mut remaining_peers = HashSet::new();
remaining_peers.insert(banned_addr);
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
peer_set.queued_broadcast_all = Some((Request::Peers, sender, remaining_peers));
peer_set.broadcast_all_queued();
if let Some((_req, _sender, remaining_peers)) = peer_set.queued_broadcast_all.take() {
assert!(remaining_peers.is_empty());
} else {
assert!(receiver.try_recv().is_ok());
}
});
}
#[test]
fn remove_unready_peer_clears_cancel_handle_and_updates_counts() {
let peer_versions = PeerVersions {
peer_versions: vec![Version::min_specified_for_upgrade(
&Network::Mainnet,
NetworkUpgrade::Nu6,
)],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let (discovered_peers, _handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
runtime.block_on(async move {
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
let banned_ip: std::net::IpAddr = "127.0.0.1".parse().unwrap();
let mut bans_map: IndexMap<std::net::IpAddr, std::time::Instant> = IndexMap::new();
bans_map.insert(banned_ip, std::time::Instant::now());
let (_bans_tx, bans_rx) = watch::channel(Arc::new(bans_map));
peer_set.bans_receiver = bans_rx;
let banned_addr: PeerSocketAddr = SocketAddr::new(banned_ip, 1).into();
let (tx, _rx) =
crate::peer_set::set::oneshot::channel::<crate::peer_set::set::CancelClientWork>();
peer_set.cancel_handles.insert(banned_addr, tx);
assert_eq!(peer_set.num_peers_with_ip(banned_ip), 1);
peer_set.remove(&banned_addr);
assert!(!peer_set.cancel_handles.contains_key(&banned_addr));
assert_eq!(peer_set.num_peers_with_ip(banned_ip), 0);
});
}
#[test]
fn peer_set_route_inv_advertised_registry() {
peer_set_route_inv_advertised_registry_order(true);
peer_set_route_inv_advertised_registry_order(false);
}
fn peer_set_route_inv_advertised_registry_order(advertised_first: bool) {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
let test_peer = if advertised_first {
"127.0.0.1:1"
} else {
"127.0.0.1:2"
}
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_available(test_inv, test_peer);
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(max(2, DEFAULT_MAX_CONNS_PER_IP))
.build();
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(peer_ready.ready_services.len(), 2);
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
let advertised_handle = if advertised_first {
&mut handles[0]
} else {
&mut handles[1]
};
if let Some(ClientRequest { request, .. }) = advertised_handle
.try_to_receive_outbound_client_request()
.request()
{
assert_eq!(sent_request, request);
} else {
panic!("inv request not routed to advertised peer");
}
let other_handle = if advertised_first {
&mut handles[1]
} else {
&mut handles[0]
};
assert!(
other_handle
.try_to_receive_outbound_client_request()
.request()
.is_none(),
"request routed to non-advertised peer",
);
});
}
#[test]
fn peer_set_route_inv_missing_registry() {
peer_set_route_inv_missing_registry_order(true);
peer_set_route_inv_missing_registry_order(false);
}
fn peer_set_route_inv_missing_registry_order(missing_first: bool) {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
let test_peer = if missing_first {
"127.0.0.1:1"
} else {
"127.0.0.1:2"
}
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_missing(test_inv, test_peer);
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.max_conns_per_ip(max(2, DEFAULT_MAX_CONNS_PER_IP))
.build();
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(peer_ready.ready_services.len(), 2);
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
let missing_handle = if missing_first {
&mut handles[0]
} else {
&mut handles[1]
};
assert!(
missing_handle
.try_to_receive_outbound_client_request()
.request()
.is_none(),
"request routed to missing peer",
);
let other_handle = if missing_first {
&mut handles[1]
} else {
&mut handles[0]
};
if let Some(ClientRequest { request, .. }) = other_handle
.try_to_receive_outbound_client_request()
.request()
{
assert_eq!(sent_request, request);
} else {
panic!(
"inv request should have been routed to the only peer not missing the inventory"
);
}
});
}
#[test]
fn peer_set_route_inv_all_missing_fail() {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
let test_peer = "127.0.0.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_missing(test_inv, test_peer);
let peer_version = Version::min_specified_for_upgrade(&Network::Mainnet, NetworkUpgrade::Nu6);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version],
};
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
tokio::time::pause();
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(&Network::Mainnet);
assert_eq!(handles.len(), 1);
runtime.block_on(async move {
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
assert_eq!(peer_ready.ready_services.len(), 1);
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let response_fut = peer_ready.call(sent_request.clone());
let missing_handle = &mut handles[0];
assert!(
missing_handle
.try_to_receive_outbound_client_request()
.request().is_none(),
"request routed to missing peer",
);
let response = response_fut.await;
assert_eq!(
response
.expect_err("peer set should return an error (not a Response)")
.downcast_ref::<SharedPeerError>()
.expect("peer set should return a boxed SharedPeerError")
.inner_debug(),
"NotFoundRegistry([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])"
);
});
}