use crate::connect;
use crate::connection::BootstrapGroupMaker;
use crate::context::ctx;
pub fn start() {
let (proxies, event_tx): (Vec<_>, _) = ctx(|c| {
(
c.bootstrap_cache
.peers()
.iter()
.rev()
.chain(c.bootstrap_cache.hard_coded_contacts().iter())
.cloned()
.collect(),
c.event_tx.clone(),
)
});
let maker = BootstrapGroupMaker::new(event_tx);
for proxy in proxies {
let _ = connect::connect_to(proxy, None, Some(&maker));
}
}
#[cfg(test)]
mod tests {
use crate::test_utils::new_random_qp2p;
use crate::{Builder, Config, Event, NodeInfo, OurType, QuicP2p};
use crossbeam_channel as mpmc;
use std::collections::{HashSet, VecDeque};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::thread;
use std::time::Duration;
#[test]
fn bootstrap_to_only_one_node_with_delays() {
test_bootstrap_to_multiple_peers(4, Some(10));
}
#[test]
fn bootstrap_to_only_one_node_without_delays() {
test_bootstrap_to_multiple_peers(4, None);
}
fn test_bootstrap_to_multiple_peers(num_conns: u64, delay_ms: Option<u64>) {
let mut bs_nodes = Vec::with_capacity(num_conns as usize);
let mut bs_peer_addrs = HashSet::with_capacity(num_conns as usize);
let mut hcc_contacts = HashSet::with_capacity(num_conns as usize);
for i in 0..num_conns {
let (mut qp2p, rx) = new_random_qp2p(false, Default::default());
if let Some(delay_ms) = delay_ms {
qp2p.set_connect_delay(i * delay_ms);
}
let ci_info = unwrap!(qp2p.our_connection_info());
assert!(bs_peer_addrs.insert(ci_info.peer_addr));
assert!(hcc_contacts.insert(ci_info));
bs_nodes.push((rx, qp2p));
}
let (mut qp2p0, rx0) = new_random_qp2p(false, hcc_contacts);
qp2p0.bootstrap();
match unwrap!(rx0.recv()) {
Event::BootstrappedTo { node, .. } => {
assert!(bs_peer_addrs.contains(&node.peer_addr));
}
ev => panic!("Unexpected event: {:?}", ev),
}
thread::sleep(Duration::from_millis(if let Some(delay_ms) = delay_ms {
num_conns * delay_ms + delay_ms
} else {
100
}));
let conns_count = unwrap!(qp2p0.connections(|c| {
c.iter()
.filter(|(_pa, conn)| conn.to_peer.is_established())
.count()
}));
assert_eq!(conns_count, 1);
}
#[test]
fn bootstrap_prioritise_cache() {
const NODES_COUNT: u64 = 8;
let mut hcc_nodes = Vec::with_capacity(NODES_COUNT as usize / 2);
let mut bootstrap_cache_nodes = Vec::with_capacity(NODES_COUNT as usize / 2);
for _i in 0..(NODES_COUNT / 2) {
let (qp2p, rx) = test_node();
bootstrap_cache_nodes.push((qp2p, rx));
let (qp2p, rx) = test_node();
hcc_nodes.push((qp2p, rx));
}
let bootstrap_cache: VecDeque<NodeInfo> = bootstrap_cache_nodes
.iter_mut()
.map(|(node, _)| unwrap!(node.our_connection_info()))
.collect();
let hard_coded_contacts: HashSet<NodeInfo> = hcc_nodes
.iter_mut()
.map(|(node, _)| unwrap!(node.our_connection_info()))
.collect();
let (ev_tx, ev_rx) = mpmc::unbounded();
let mut bootstrapping_node = unwrap!(Builder::new(ev_tx)
.with_config(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts: hard_coded_contacts.clone(),
our_type: OurType::Client,
..Default::default()
})
.with_proxies(bootstrap_cache.clone(), true,)
.build());
bootstrapping_node.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrappedTo { .. } = event {
let attempted_conns = unwrap!(bootstrapping_node.attempted_connections());
assert_eq!(
&attempted_conns[0..(NODES_COUNT as usize / 2)],
bootstrap_cache
.iter()
.map(|node| node.peer_addr)
.rev()
.collect::<Vec<_>>()
.as_slice()
);
break;
}
}
}
#[test]
fn bootstrap_failure() {
let (mut bootstrap_node, _rx) = test_node();
bootstrap_node.set_connect_delay(100);
let bootstrap_ci = unwrap!(bootstrap_node.our_connection_info());
let mut hcc = HashSet::with_capacity(1);
assert!(hcc.insert(bootstrap_ci.clone()));
let (ev_tx, ev_rx) = mpmc::unbounded();
let mut bootstrap_client = unwrap!(Builder::new(ev_tx)
.with_config(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts: hcc,
our_type: OurType::Node,
idle_timeout_msec: Some(30),
..Default::default()
})
.with_proxies(Default::default(), true)
.build());
bootstrap_client.bootstrap();
match unwrap!(ev_rx.recv()) {
Event::BootstrapFailure => (),
ev => {
panic!("Unexpected event: {:?}", ev);
}
}
}
#[test]
fn node_will_attempt_hard_coded_contacts() {
let (mut peer1, _) = test_node();
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(peer1_conn_info.clone()));
test_peer_with_hcc(hcc, OurType::Node)
};
peer2.bootstrap();
let peer2_conn_info = unwrap!(peer2.our_connection_info());
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_conn_info);
break;
}
}
let is_peer1_state_valid = unwrap!(peer1.connections(move |c| {
c[&peer2_conn_info.peer_addr].from_peer.is_established()
&& c[&peer2_conn_info.peer_addr].to_peer.is_established()
}));
assert!(is_peer1_state_valid);
let is_peer2_state_valid = unwrap!(peer2.connections(move |c| {
c[&peer1_conn_info.peer_addr].to_peer.is_established()
&& c[&peer1_conn_info.peer_addr].from_peer.is_established()
}));
assert!(is_peer2_state_valid);
}
#[test]
fn client_will_attempt_hard_coded_contacts() {
let (mut peer1, _) = test_node();
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(peer1_conn_info.clone()));
test_peer_with_hcc(hcc, OurType::Client)
};
peer2.bootstrap();
let peer2_conn_info = unwrap!(peer2.our_connection_info());
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_conn_info);
break;
}
}
let is_peer1_state_valid = unwrap!(peer1.connections(move |c| {
c[&peer2_conn_info.peer_addr].from_peer.is_established()
&& c[&peer2_conn_info.peer_addr].to_peer.is_not_needed()
}));
assert!(is_peer1_state_valid);
let is_peer2_state_valid = unwrap!(peer2.connections(move |c| {
c[&peer1_conn_info.peer_addr].to_peer.is_established()
&& c[&peer1_conn_info.peer_addr].from_peer.is_not_needed()
}));
assert!(is_peer2_state_valid);
}
#[test]
fn node_will_attempt_cached_peers() {
let (mut peer1, _) = test_node();
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let (mut peer2, ev_rx) = test_peer_with_bootstrap_cache(vec![peer1_conn_info.clone()]);
peer2.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrappedTo { node } = event {
assert_eq!(node, peer1_conn_info);
break;
}
}
}
#[test]
fn node_will_report_failure_when_bootstrap_cache_and_hard_coded_contacts_are_empty() {
let (mut peer, ev_rx) = test_node();
peer.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrapFailure = event {
break;
}
}
}
#[test]
fn node_will_report_failure_when_bootstrap_cache_and_hard_coded_contacts_are_invalid() {
let dummy_peer_info = NodeInfo {
peer_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 37692)),
peer_cert_der: vec![1, 2, 3],
};
let (mut peer, ev_rx) = {
let mut hcc = HashSet::new();
assert!(hcc.insert(dummy_peer_info.clone()));
test_peer_with_hcc(hcc, OurType::Node)
};
peer.bootstrap();
for event in ev_rx.iter() {
if let Event::BootstrapFailure = event {
break;
}
}
}
fn test_peer_with_bootstrap_cache(
mut cached_peers: Vec<NodeInfo>,
) -> (QuicP2p, mpmc::Receiver<Event>) {
let cached_peers: VecDeque<_> = cached_peers.drain(..).collect();
let (ev_tx, ev_rx) = mpmc::unbounded();
let builder = Builder::new(ev_tx)
.with_config(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Default::default()
})
.with_proxies(cached_peers, true);
(unwrap!(builder.build()), ev_rx)
}
fn test_node() -> (QuicP2p, mpmc::Receiver<Event>) {
test_peer_with_hcc(Default::default(), OurType::Node)
}
fn test_peer_with_hcc(
hard_coded_contacts: HashSet<NodeInfo>,
our_type: OurType,
) -> (QuicP2p, mpmc::Receiver<Event>) {
let (ev_tx, ev_rx) = mpmc::unbounded();
let builder = Builder::new(ev_tx)
.with_config(Config {
port: Some(0),
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
hard_coded_contacts,
our_type,
..Default::default()
})
.with_proxies(Default::default(), true);
(unwrap!(builder.build()), ev_rx)
}
}