use std::{
env,
net::SocketAddr,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use proptest::{
collection::{hash_map, vec},
prelude::*,
};
use tokio::time::{sleep, timeout};
use tracing::Span;
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
use crate::{
canonical_peer_addr,
constants::{DEFAULT_MAX_CONNS_PER_IP, MIN_OUTBOUND_PEER_CONNECTION_INTERVAL},
meta_addr::{MetaAddr, MetaAddrChange},
AddressBook, BoxError, Request, Response,
};
use super::super::{validate_addrs, CandidateSet};
const MAX_TEST_CANDIDATES: u32 = 4;
const TEST_ADDRESSES: usize = 2 * MAX_TEST_CANDIDATES as usize;
const DEFAULT_SLEEP_TEST_PROPTEST_CASES: u32 = 16;
const MAX_SLEEP_EXTRA_DELAY: Duration = Duration::from_secs(1);
proptest! {
#[test]
fn no_last_seen_times_are_in_the_future(
gossiped_peers in vec(MetaAddr::gossiped_strategy(), 1..TEST_ADDRESSES),
last_seen_limit in any::<DateTime32>(),
) {
let _init_guard = zebra_test::init();
let validated_peers = validate_addrs(gossiped_peers, last_seen_limit);
for peer in validated_peers {
prop_assert!(peer.untrusted_last_seen().unwrap() <= last_seen_limit);
}
}
#[test]
fn skipping_outbound_peer_connection_skips_rate_limit(next_peer_attempts in 0..TEST_ADDRESSES) {
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let peer_service = tower::service_fn(|_| async {
unreachable!("Mock peer service is never used");
});
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), &Mainnet, DEFAULT_MAX_CONNS_PER_IP, Span::none());
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
for _ in 0..next_peer_attempts {
let less_than_min_interval = MIN_OUTBOUND_PEER_CONNECTION_INTERVAL - Duration::from_millis(1);
assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None));
}
}
}
proptest! {
#![proptest_config(proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_SLEEP_TEST_PROPTEST_CASES)))]
#[test]
fn new_outbound_peer_connections_are_rate_limited(
peers in hash_map(MetaAddrChange::ready_outbound_strategy_ip(), MetaAddrChange::ready_outbound_strategy_port(), TEST_ADDRESSES),
initial_candidates in 0..MAX_TEST_CANDIDATES,
extra_candidates in 0..MAX_TEST_CANDIDATES,
) {
let (runtime, _init_guard) = zebra_test::init_async();
let _guard = runtime.enter();
let peers = peers.into_iter().map(|(ip, port)| {
MetaAddr::new_initial_peer(canonical_peer_addr(SocketAddr::new(ip, port)))
}).collect::<Vec<_>>();
let peer_service = tower::service_fn(|_| async {
unreachable!("Mock peer service is never used");
});
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), &Mainnet, DEFAULT_MAX_CONNS_PER_IP, Span::none());
address_book.extend(peers);
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
let checks = async move {
check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await;
sleep(MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL).await;
check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await;
};
let max_rate_limit_sleep = 3 * MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
let max_extra_delay = (2 * MAX_TEST_CANDIDATES + 1) * MAX_SLEEP_EXTRA_DELAY;
assert!(runtime.block_on(timeout(max_rate_limit_sleep + max_extra_delay, checks)).is_ok());
}
}
async fn check_candidates_rate_limiting<S>(candidate_set: &mut CandidateSet<S>, candidates: u32)
where
S: tower::Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
let mut now = Instant::now();
let mut minimum_reconnect_instant = now;
let mut maximum_reconnect_instant = now + MAX_SLEEP_EXTRA_DELAY;
for _ in 0..candidates {
assert!(
candidate_set.next().await.is_some(),
"there are enough available candidates"
);
now = Instant::now();
assert!(
now >= minimum_reconnect_instant,
"all candidates should obey the minimum rate-limit: now: {now:?} min: {minimum_reconnect_instant:?}",
);
assert!(
now <= maximum_reconnect_instant,
"rate-limited candidates should not be delayed too long: now: {now:?} max: {maximum_reconnect_instant:?}. Hint: is the test machine overloaded?",
);
minimum_reconnect_instant = now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
maximum_reconnect_instant =
now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL + MAX_SLEEP_EXTRA_DELAY;
}
}