use crate::relay_manager::{is_a_relayed_peer, RelayManager};
use crate::{
craft_valid_multiaddr_without_p2p, multiaddr_strip_p2p, Addresses, NetworkEvent, SwarmDriver,
};
use ant_protocol::version::IDENTIFY_PROTOCOL_STR;
use itertools::Itertools;
use libp2p::identify::Info;
use libp2p::kad::K_VALUE;
use libp2p::multiaddr::Protocol;
use libp2p::Multiaddr;
use std::collections::{hash_map, HashSet};
use std::time::{Duration, Instant};
const DIAL_BACK_DELAY: Duration = Duration::from_secs(180);
impl SwarmDriver {
pub(super) fn handle_identify_event(&mut self, identify_event: libp2p::identify::Event) {
match identify_event {
libp2p::identify::Event::Received {
peer_id,
info,
connection_id,
} => {
let start = Instant::now();
self.handle_identify_received(peer_id, info, connection_id);
trace!("SwarmEvent handled in {:?}: identify", start.elapsed());
}
libp2p::identify::Event::Sent { .. } => debug!("identify: {identify_event:?}"),
libp2p::identify::Event::Pushed { .. } => debug!("identify: {identify_event:?}"),
libp2p::identify::Event::Error { .. } => warn!("identify: {identify_event:?}"),
}
}
fn handle_identify_received(
&mut self,
peer_id: libp2p::PeerId,
info: Info,
connection_id: libp2p::swarm::ConnectionId,
) {
debug!("identify: received info from {peer_id:?} on {connection_id:?}. Info: {info:?}");
let Some((_, addr_fom_connection, _)) = self.live_connected_peers.get(&connection_id)
else {
warn!("identify: received info for peer {peer_id:?} on {connection_id:?} that is not in the live connected peers");
return;
};
let our_identify_protocol = IDENTIFY_PROTOCOL_STR.read().expect("IDENTIFY_PROTOCOL_STR has been locked to write. A call to set_network_id performed. This should not happen.").to_string();
if info.protocol_version != our_identify_protocol {
warn!("identify: {peer_id:?} does not have the same protocol. Our IDENTIFY_PROTOCOL_STR: {our_identify_protocol:?}. Their protocol version: {:?}",info.protocol_version);
self.send_event(NetworkEvent::PeerWithUnsupportedProtocol {
our_protocol: our_identify_protocol,
their_protocol: info.protocol_version,
});
self.swarm.behaviour_mut().blocklist.block_peer(peer_id);
if let Some(dead_peer) = self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id) {
error!("Clearing out a protocol mismatch peer from RT. The peer pushed an incorrect identify info after being added: {peer_id:?}");
self.update_on_peer_removal(*dead_peer.node.key.preimage());
}
return;
}
let has_dialed = self.dialed_peers.contains(&peer_id);
let is_relayed_peer = is_a_relayed_peer(info.listen_addrs.iter());
let addrs = if !is_relayed_peer {
let addr = craft_valid_multiaddr_without_p2p(addr_fom_connection);
let Some(addr) = addr else {
warn!("identify: no valid multiaddr found for {peer_id:?} on {connection_id:?}");
return;
};
debug!("Peer {peer_id:?} is a normal peer, crafted valid multiaddress : {addr:?}.");
vec![addr]
} else {
let p2p_addrs = info
.listen_addrs
.iter()
.filter_map(|addr| RelayManager::craft_relay_address(addr, None))
.unique()
.collect::<Vec<_>>();
debug!("Peer {peer_id:?} is a relayed peer. Using p2p addr from identify directly: {p2p_addrs:?}");
p2p_addrs
};
if info.agent_version.contains("reachability-check-peer") {
debug!("Peer {peer_id:?} is requesting for a reachability check. Adding it to the dial queue. Not adding to RT.");
self.dial_queue.insert(
peer_id,
(
Addresses(addrs.clone()),
Instant::now() + DIAL_BACK_DELAY,
1,
),
);
return;
} else if info.agent_version.contains("client") {
debug!("Peer {peer_id:?} is a client. Not dialing or adding to RT.");
return;
}
if !is_relayed_peer && !self.initial_bootstrap.is_bootstrap_peer(&peer_id) {
if let Some(relay_manager) = self.relay_manager.as_mut() {
debug!("Adding candidate relay server {peer_id:?}, it's not a bootstrap node");
relay_manager.add_potential_candidates(&peer_id, &addrs, &info.protocols);
}
}
let (kbucket_full, already_present_in_rt, ilog2) =
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
let ilog2 = kbucket.range().0.ilog2();
let num_peers = kbucket.num_entries();
let is_bucket_full = num_peers >= K_VALUE.into();
let already_present_in_rt = kbucket
.iter()
.any(|entry| entry.node.key.preimage() == &peer_id);
(is_bucket_full, already_present_in_rt, ilog2)
} else {
return;
};
if already_present_in_rt {
debug!("Received identify for {peer_id:?} that is already part of the RT. Checking if the addresses {addrs:?} are new.");
self.update_pre_existing_peer(peer_id, &addrs, is_relayed_peer);
} else if !self.local && !has_dialed {
let exists_in_dial_queue = self.dial_queue.contains_key(&peer_id);
if kbucket_full && !exists_in_dial_queue {
debug!("received identify for a full bucket {ilog2:?}, not dialing {peer_id:?} on {addrs:?}");
return;
}
info!("received identify info from undialed peer {peer_id:?} for not full kbucket {ilog2:?}, dialing back after {DIAL_BACK_DELAY:?}. Addrs: {addrs:?}");
let support_dnd = does_the_peer_support_dnd(&info);
let mut send_dnd = false;
match self.dial_queue.entry(peer_id) {
hash_map::Entry::Occupied(mut entry) => {
let (old_addrs, time, resets) = entry.get_mut();
*resets += 1;
*time = Instant::now() + DIAL_BACK_DELAY;
if *resets >= 3 {
send_dnd = true;
if support_dnd {
debug!("Peer {peer_id:?} has been reset 3 times. Will now send a DoNotDisturb request.");
} else {
*time = Instant::now();
warn!("Peer {peer_id:?} has been reset 3 times. It does not support DoNotDisturb. Dialing it back immediately.");
}
} else {
debug!(
"Peer {peer_id:?} has been re-added to the dial queue; Reset the dial back time to {DIAL_BACK_DELAY:?} (resets: {resets})",
);
}
for addr in addrs.iter() {
if !old_addrs.0.contains(addr) {
debug!("Adding new addr {addr:?} to dial queue for {peer_id:?}");
old_addrs.0.push(addr.clone());
} else {
debug!("Already have addr {addr:?} in dial queue for {peer_id:?}.");
}
}
if is_relayed_peer {
let mut removed = false;
old_addrs.0.retain(|addr| {
if addr.iter().any(|seg| matches!(seg, Protocol::P2pCircuit)) {
true
} else {
removed = true;
false
}
});
if removed {
debug!("Removed non-p2p addr from dial queue for {peer_id:?}. Remaining addrs: {old_addrs:?}");
}
}
}
hash_map::Entry::Vacant(entry) => {
debug!("Adding new addr {addrs:?} to dial queue for {peer_id:?}");
entry.insert((Addresses(addrs), Instant::now() + DIAL_BACK_DELAY, 1));
}
}
if send_dnd && support_dnd {
self.swarm
.behaviour_mut()
.do_not_disturb
.send_do_not_disturb_request(peer_id, DIAL_BACK_DELAY.as_secs() + 20);
}
} else {
debug!("identify: attempting to add addresses to routing table for {peer_id:?}. Addrs: {addrs:?}");
for addr in addrs.into_iter() {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr);
}
}
}
fn update_pre_existing_peer(
&mut self,
peer_id: libp2p::PeerId,
new_addrs: &[Multiaddr],
is_relayed_peer: bool,
) {
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
let new_addrs = new_addrs.iter().cloned().collect::<HashSet<_>>();
let mut addresses_to_add = Vec::new();
let Some(entry) = kbucket
.iter()
.find(|entry| entry.node.key.preimage() == &peer_id)
else {
warn!("Peer {peer_id:?} is not part of the RT. Cannot update addresses.");
return;
};
let existing_addrs = entry
.node
.value
.iter()
.map(multiaddr_strip_p2p)
.collect::<HashSet<_>>();
addresses_to_add.extend(new_addrs.difference(&existing_addrs));
if is_relayed_peer {
let mut addresses_to_remove = Vec::new();
addresses_to_remove.extend(existing_addrs.difference(&new_addrs));
if !addresses_to_remove.is_empty() {
debug!("Removing addresses from RT for {peer_id:?} (relayed) as the new identify does not contain them: {addresses_to_remove:?}");
for multiaddr in addresses_to_remove {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.remove_address(&peer_id, multiaddr);
}
}
}
if !addresses_to_add.is_empty() {
debug!("Adding addresses to RT for {peer_id:?} as the new identify contains them: {addresses_to_add:?}");
for multiaddr in addresses_to_add {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr.clone());
}
}
}
}
}
fn does_the_peer_support_dnd(info: &Info) -> bool {
for protocol in &info.protocols {
if protocol.to_string().contains("autonomi/dnd") {
return true;
}
}
false
}