use crate::{driver::NodeBehaviour, multiaddr_get_p2p, multiaddr_pop_p2p};
use libp2p::{
core::ConnectedPoint,
swarm::{
dial_opts::{DialOpts, PeerCondition},
DialError,
},
Multiaddr, PeerId, Swarm,
};
use rand::seq::SliceRandom;
use std::collections::{HashSet, VecDeque};
pub(crate) const INITIAL_BOOTSTRAP_CHECK_INTERVAL: std::time::Duration =
std::time::Duration::from_secs(1);
const CONCURRENT_DIALS: usize = 3;
const MAX_PEERS_BEFORE_TERMINATION: usize = 5;
pub(crate) struct InitialBootstrapTrigger {
pub(crate) upnp: bool,
pub(crate) upnp_gateway_result_obtained: bool,
pub(crate) listen_addr_obtained: bool,
}
impl InitialBootstrapTrigger {
pub(crate) fn new(upnp: bool) -> Self {
Self {
upnp,
upnp_gateway_result_obtained: false,
listen_addr_obtained: false,
}
}
pub(crate) fn should_trigger_initial_bootstrap(&self) -> bool {
if self.upnp {
return self.upnp_gateway_result_obtained;
}
if self.listen_addr_obtained {
return true;
}
false
}
}
pub(crate) struct InitialBootstrap {
initial_addrs: VecDeque<Multiaddr>,
ongoing_dials: HashSet<Multiaddr>,
bootstrap_completed: bool,
initial_bootstrap_peer_ids: HashSet<PeerId>,
}
impl InitialBootstrap {
pub(crate) fn new(mut initial_addrs: Vec<Multiaddr>) -> Self {
let bootstrap_completed = if initial_addrs.is_empty() {
info!("No initial addresses provided for bootstrap. Initial bootstrap process will not be triggered.");
true
} else {
let mut rng = rand::thread_rng();
initial_addrs.shuffle(&mut rng);
false
};
let initial_bootstrap_peer_ids =
initial_addrs.iter().filter_map(multiaddr_get_p2p).collect();
Self {
initial_addrs: initial_addrs.into(),
ongoing_dials: Default::default(),
bootstrap_completed,
initial_bootstrap_peer_ids,
}
}
pub(crate) fn is_bootstrap_peer(&self, peer_id: &PeerId) -> bool {
self.initial_bootstrap_peer_ids.contains(peer_id)
}
pub(crate) fn has_terminated(&self) -> bool {
self.bootstrap_completed
}
pub(crate) fn trigger_bootstrapping_process(
&mut self,
swarm: &mut Swarm<NodeBehaviour>,
peers_in_rt: usize,
) {
if !self.should_we_continue_bootstrapping(peers_in_rt, true) {
return;
}
while self.ongoing_dials.len() < CONCURRENT_DIALS && !self.initial_addrs.is_empty() {
let Some(mut addr) = self.initial_addrs.pop_front() else {
continue;
};
let addr_clone = addr.clone();
let peer_id = multiaddr_pop_p2p(&mut addr);
let opts = match peer_id {
Some(peer_id) => DialOpts::peer_id(peer_id)
.condition(PeerCondition::NotDialing)
.addresses(vec![addr])
.build(),
None => DialOpts::unknown_peer_id().address(addr).build(),
};
info!("Trying to dial peer with address: {addr_clone}",);
match swarm.dial(opts) {
Ok(()) => {
info!("Dial attempt initiated for peer with address: {addr_clone}. Ongoing dial attempts: {}", self.ongoing_dials.len()+1);
self.ongoing_dials.insert(addr_clone);
}
Err(err) => match err {
DialError::LocalPeerId { .. } => {
warn!("Failed to dial peer with address: {addr_clone}. This is our own peer ID. Dialing the next peer");
}
DialError::NoAddresses => {
error!("Failed to dial peer with address: {addr_clone}. No addresses found. Dialing the next peer");
}
DialError::DialPeerConditionFalse(_) => {
warn!("We are already dialing the peer with address: {addr_clone}. Dialing the next peer. This error is harmless.");
}
DialError::Aborted => {
error!(" Pending connection attempt has been aborted for {addr_clone}. Dialing the next peer.");
}
DialError::WrongPeerId { obtained, .. } => {
error!("The peer identity obtained on the connection did not match the one that was expected. Expected: {peer_id:?}, obtained: {obtained}. Dialing the next peer.");
}
DialError::Denied { cause } => {
error!("The dialing attempt was denied by the remote peer. Cause: {cause}. Dialing the next peer.");
}
DialError::Transport(items) => {
error!("Failed to dial peer with address: {addr_clone}. Transport error: {items:?}. Dialing the next peer.");
}
},
}
}
}
fn should_we_continue_bootstrapping(&mut self, peers_in_rt: usize, verbose: bool) -> bool {
if self.bootstrap_completed {
if verbose {
info!("Initial bootstrap process has already completed successfully.");
} else {
trace!("Initial bootstrap process has already completed successfully.");
}
return false;
}
if peers_in_rt >= MAX_PEERS_BEFORE_TERMINATION {
self.bootstrap_completed = true;
self.initial_addrs.clear();
self.ongoing_dials.clear();
if verbose {
info!("Initial bootstrap process completed successfully. We have {peers_in_rt} peers in the routing table.");
} else {
trace!("Initial bootstrap process completed successfully. We have {peers_in_rt} peers in the routing table.");
}
return false;
}
if self.ongoing_dials.len() >= CONCURRENT_DIALS {
if verbose {
info!(
"Initial bootstrap has {} ongoing dials. Not dialing anymore.",
self.ongoing_dials.len()
);
} else {
debug!(
"Initial bootstrap has {} ongoing dials. Not dialing anymore.",
self.ongoing_dials.len()
);
}
return false;
}
if peers_in_rt < MAX_PEERS_BEFORE_TERMINATION && self.initial_addrs.is_empty() {
if verbose {
info!("We have {peers_in_rt} peers in RT, but no more addresses to dial. Stopping initial bootstrap.");
} else {
debug!("We have {peers_in_rt} peers in RT, but no more addresses to dial. Stopping initial bootstrap.");
}
return false;
}
if self.initial_addrs.is_empty() {
if verbose {
warn!("Initial bootstrap has no more addresses to dial.");
} else {
debug!("Initial bootstrap has no more addresses to dial.");
}
return false;
}
true
}
pub(crate) fn on_connection_established(
&mut self,
endpoint: &ConnectedPoint,
swarm: &mut Swarm<NodeBehaviour>,
peers_in_rt: usize,
) {
if self.bootstrap_completed {
return;
}
if let ConnectedPoint::Dialer { address, .. } = endpoint {
if !self.ongoing_dials.remove(address) {
if let Some(peer_id) = multiaddr_get_p2p(address) {
self.ongoing_dials.retain(|addr| {
if let Some(id) = multiaddr_get_p2p(addr) {
id != peer_id
} else {
true
}
});
}
}
}
self.trigger_bootstrapping_process(swarm, peers_in_rt);
}
pub(crate) fn on_outgoing_connection_error(
&mut self,
peer_id: Option<PeerId>,
swarm: &mut Swarm<NodeBehaviour>,
peers_in_rt: usize,
) {
if self.bootstrap_completed {
return;
}
match peer_id {
Some(peer_id) => {
self.ongoing_dials.retain(|addr| {
if let Some(id) = multiaddr_get_p2p(addr) {
id != peer_id
} else {
true
}
});
}
None => {
self.ongoing_dials
.retain(|addr| multiaddr_get_p2p(addr).is_some());
}
}
self.trigger_bootstrapping_process(swarm, peers_in_rt);
}
}