use crate::{
connect, connection::BootstrapGroupMaker, context::ctx, EventSenders, QuicP2pError, WireMsg,
};
use log::trace;
use std::net::SocketAddr;
pub fn start() {
let bootstrap_nodes = bootstrap_nodes();
let event_tx = ctx(|c| c.event_tx.clone());
let maker = BootstrapGroupMaker::new(event_tx);
trace!("Bootstrapping to {:?}", bootstrap_nodes);
for bootstrap_node in bootstrap_nodes {
let _ = connect::connect_to(bootstrap_node, None, Some(&maker));
}
}
pub(crate) fn echo_request(notify: EventSenders) -> bool {
let bootstrap_nodes = bootstrap_nodes();
let mut maker = BootstrapGroupMaker::new(notify);
let send_after_connect = Some((WireMsg::EndpointEchoReq, 0));
trace!("Sending echo requests to nodes {:?}", bootstrap_nodes);
for bootstrap_node in bootstrap_nodes {
let res = connect::connect_to(bootstrap_node, send_after_connect.clone(), Some(&maker));
if let Err(QuicP2pError::DuplicateConnectionToPeer { peer_addr }) = res {
maker.set_already_bootstrapped(peer_addr);
return true;
}
}
false
}
fn bootstrap_nodes() -> Vec<SocketAddr> {
ctx(|c| {
c.bootstrap_cache
.peers()
.iter()
.rev()
.chain(c.bootstrap_cache.hard_coded_contacts().iter())
.cloned()
.collect()
})
}
#[cfg(test)]
mod tests {
use crate::{
test_utils::new_random_qp2p,
utils::{new_unbounded_channels, EventReceivers},
Config, Event, OurType, QuicP2p,
};
use std::collections::{HashSet, VecDeque};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::thread;
use std::time::Duration;
use unwrap::unwrap;
#[test]
fn bootstrap_to_only_one_node_with_delays() {
test_bootstrap_to_multiple_peers(4, Some(10));
}
#[test]
fn bootstrap_to_only_one_node_without_delays() {
test_bootstrap_to_multiple_peers(4, None);
}
fn test_bootstrap_to_multiple_peers(num_conns: u64, delay_ms: Option<u64>) {
let mut bs_nodes = Vec::with_capacity(num_conns as usize);
let mut bs_peer_addrs = HashSet::with_capacity(num_conns as usize);
let mut hcc_contacts = HashSet::with_capacity(num_conns as usize);
for i in 0..num_conns {
let (mut qp2p, rx) = new_random_qp2p(false, Default::default());
if let Some(delay_ms) = delay_ms {
qp2p.set_connect_delay(i * delay_ms);
}
let our_addr = unwrap!(qp2p.our_connection_info());
assert!(bs_peer_addrs.insert(our_addr));
assert!(hcc_contacts.insert(our_addr));
bs_nodes.push((rx, qp2p));
}
let (mut qp2p0, rx0) = new_random_qp2p(false, hcc_contacts);
qp2p0.bootstrap();
match unwrap!(rx0.recv()) {
Event::BootstrappedTo { node, .. } => {
assert!(bs_peer_addrs.contains(&node));
}
ev => panic!("Unexpected event: {:?}", ev),
}
thread::sleep(Duration::from_millis(if let Some(delay_ms) = delay_ms {
num_conns * delay_ms + delay_ms
} else {
100
}));
let conns_count = unwrap!(qp2p0.connections(|c| {
c.iter()
.filter(|(_pa, conn)| conn.to_peer.is_established())
.count()
}));
assert_eq!(conns_count, 1);
}
#[test]
fn bootstrap_prioritise_cache() {
const NODES_COUNT: u64 = 8;
let mut hcc_nodes = Vec::with_capacity(NODES_COUNT as usize / 2);
let mut bootstrap_cache_nodes = Vec::with_capacity(NODES_COUNT as usize / 2);
for _i in 0..(NODES_COUNT / 2) {
let (qp2p, rx) = test_node();
bootstrap_cache_nodes.push((qp2p, rx));
let (qp2p, rx) = test_node();
hcc_nodes.push((qp2p, rx));
}
let bootstrap_cache: VecDeque<_> = bootstrap_cache_nodes
.iter_mut()
.map(|(node, _)| unwrap!(node.our_connection_info()))
.collect();
let hard_coded_contacts: HashSet<_> = hcc_nodes
.iter_mut()
.map(|(node, _)| unwrap!(node.our_connection_info()))
.collect();
let (ev_tx, ev_rx) = new_unbounded_channels();
let mut bootstrapping_node = unwrap!(QuicP2p::with_config(
ev_tx,
Some(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
our_type: OurType::Client,
..Default::default()
}),
bootstrap_cache.clone(),
true
));
bootstrapping_node.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrappedTo { .. } = event {
let attempted_conns = unwrap!(bootstrapping_node.attempted_connections());
assert_eq!(
&attempted_conns[0..(NODES_COUNT as usize / 2)],
bootstrap_cache
.iter()
.rev()
.copied()
.collect::<Vec<_>>()
.as_slice()
);
break;
}
}
}
#[test]
#[ignore] fn bootstrap_failure() {
let (mut bootstrap_node, _rx) = test_node();
bootstrap_node.set_connect_delay(100);
let bootstrap_ci = unwrap!(bootstrap_node.our_connection_info());
let mut hcc = HashSet::with_capacity(1);
assert!(hcc.insert(bootstrap_ci));
let (ev_tx, ev_rx) = new_unbounded_channels();
let mut bootstrap_client = unwrap!(QuicP2p::with_config(
ev_tx,
Some(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts: hcc,
our_type: OurType::Node,
idle_timeout_msec: Some(30),
..Default::default()
}),
Default::default(),
true
));
bootstrap_client.bootstrap();
match unwrap!(ev_rx.recv()) {
Event::BootstrapFailure => (),
ev => {
panic!("Unexpected event: {:?}", ev);
}
}
}
#[test]
fn node_will_attempt_hard_coded_contacts() {
let (mut peer1, _) = test_node();
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(peer1_conn_info));
test_peer_with_hcc(hcc, OurType::Node)
};
peer2.bootstrap();
let peer2_conn_info = unwrap!(peer2.our_connection_info());
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_conn_info);
break;
}
}
let is_peer1_state_valid = unwrap!(peer1.connections(move |c| {
c[&peer2_conn_info].from_peer.is_established()
&& c[&peer2_conn_info].to_peer.is_established()
}));
assert!(is_peer1_state_valid);
let is_peer2_state_valid = unwrap!(peer2.connections(move |c| {
c[&peer1_conn_info].to_peer.is_established()
&& c[&peer1_conn_info].from_peer.is_established()
}));
assert!(is_peer2_state_valid);
}
#[test]
fn client_will_attempt_hard_coded_contacts() {
let (mut peer1, _) = test_node();
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(peer1_conn_info));
test_peer_with_hcc(hcc, OurType::Client)
};
peer2.bootstrap();
let peer2_conn_info = unwrap!(peer2.our_connection_info());
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_conn_info);
break;
}
}
let is_peer1_state_valid = peer1
.connections(move |c| {
c[&peer2_conn_info].from_peer.is_established()
&& c[&peer2_conn_info].to_peer.is_not_needed()
})
.unwrap();
assert!(is_peer1_state_valid);
let is_peer2_state_valid = peer2
.connections(move |c| {
c[&peer1_conn_info].to_peer.is_established()
&& c[&peer1_conn_info].from_peer.is_not_needed()
})
.unwrap();
assert!(is_peer2_state_valid);
}
#[test]
fn node_will_attempt_cached_peers() {
let (mut peer1, _) = test_node();
let peer1_addr = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = test_peer_with_bootstrap_cache(vec![peer1_addr]);
peer2.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_addr);
break;
}
}
}
#[test]
fn node_will_report_failure_when_bootstrap_cache_and_hard_coded_contacts_are_empty() {
let (mut peer, ev_rx) = test_node();
peer.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrapFailure = event {
break;
}
}
}
#[test]
fn node_will_report_failure_when_bootstrap_cache_and_hard_coded_contacts_are_invalid() {
let dummy_node_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 37692));
let (mut peer, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(dummy_node_addr));
test_peer_with_hcc(hcc, OurType::Node)
};
peer.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrapFailure = event {
break;
}
}
}
fn test_peer_with_bootstrap_cache(
mut cached_peers: Vec<SocketAddr>,
) -> (QuicP2p, EventReceivers) {
let cached_peers: VecDeque<_> = cached_peers.drain(..).collect();
let (ev_tx, ev_rx) = new_unbounded_channels();
let quic_p2p = unwrap!(QuicP2p::with_config(
ev_tx,
Some(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Default::default()
}),
cached_peers,
true
));
(quic_p2p, ev_rx)
}
fn test_node() -> (QuicP2p, EventReceivers) {
test_peer_with_hcc(Default::default(), OurType::Node)
}
fn test_peer_with_hcc(
hard_coded_contacts: HashSet<SocketAddr>,
our_type: OurType,
) -> (QuicP2p, EventReceivers) {
let (ev_tx, ev_rx) = new_unbounded_channels();
let quic_p2p = unwrap!(QuicP2p::with_config(
ev_tx,
Some(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
our_type,
..Default::default()
}),
Default::default(),
true
));
(quic_p2p, ev_rx)
}
}