use std::{
cmp::min,
collections::{HashMap, HashSet},
net::{IpAddr, SocketAddr, ToSocketAddrs},
sync::Arc,
time::{Duration, SystemTime},
};
use duration_string::DurationString;
use futures_util::future::join_all;
use itertools::Itertools;
use kaspa_addressmanager::{AddressManager, NetAddress};
use kaspa_core::{debug, info, warn};
use kaspa_p2p_lib::{common::ProtocolError, ConnectionError, Peer};
use kaspa_utils::triggers::SingleTrigger;
use parking_lot::Mutex as ParkingLotMutex;
use rand::{seq::SliceRandom, thread_rng};
use tokio::{
select,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex as TokioMutex,
},
time::{interval, MissedTickBehavior},
};
pub struct ConnectionManager {
p2p_adaptor: Arc<kaspa_p2p_lib::Adaptor>,
outbound_target: usize,
inbound_limit: usize,
dns_seeders: &'static [&'static str],
default_port: u16,
address_manager: Arc<ParkingLotMutex<AddressManager>>,
connection_requests: TokioMutex<HashMap<SocketAddr, ConnectionRequest>>,
force_next_iteration: UnboundedSender<()>,
shutdown_signal: SingleTrigger,
}
#[derive(Clone, Debug)]
struct ConnectionRequest {
next_attempt: SystemTime,
is_permanent: bool,
attempts: u32,
}
impl ConnectionRequest {
fn new(is_permanent: bool) -> Self {
Self { next_attempt: SystemTime::now(), is_permanent, attempts: 0 }
}
}
impl ConnectionManager {
pub fn new(
p2p_adaptor: Arc<kaspa_p2p_lib::Adaptor>,
outbound_target: usize,
inbound_limit: usize,
dns_seeders: &'static [&'static str],
default_port: u16,
address_manager: Arc<ParkingLotMutex<AddressManager>>,
) -> Arc<Self> {
let (tx, rx) = unbounded_channel::<()>();
let manager = Arc::new(Self {
p2p_adaptor,
outbound_target,
inbound_limit,
address_manager,
connection_requests: Default::default(),
force_next_iteration: tx,
shutdown_signal: SingleTrigger::new(),
dns_seeders,
default_port,
});
manager.clone().start_event_loop(rx);
manager.force_next_iteration.send(()).unwrap();
manager
}
fn start_event_loop(self: Arc<Self>, mut rx: UnboundedReceiver<()>) {
let mut ticker = interval(Duration::from_secs(30));
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
tokio::spawn(async move {
loop {
if self.shutdown_signal.trigger.is_triggered() {
break;
}
select! {
_ = rx.recv() => self.clone().handle_event().await,
_ = ticker.tick() => self.clone().handle_event().await,
_ = self.shutdown_signal.listener.clone() => break,
}
}
debug!("Connection manager event loop exiting");
});
}
async fn handle_event(self: Arc<Self>) {
debug!("Starting connection loop iteration");
let peers = self.p2p_adaptor.active_peers();
let peer_by_address: HashMap<SocketAddr, Peer> = peers.into_iter().map(|peer| (peer.net_address(), peer)).collect();
self.handle_connection_requests(&peer_by_address).await;
self.handle_outbound_connections(&peer_by_address).await;
self.handle_inbound_connections(&peer_by_address).await;
}
pub async fn add_connection_request(&self, address: SocketAddr, is_permanent: bool) {
self.connection_requests.lock().await.insert(address, ConnectionRequest::new(is_permanent));
self.force_next_iteration.send(()).unwrap(); }
pub async fn stop(&self) {
self.shutdown_signal.trigger.trigger()
}
async fn handle_connection_requests(self: &Arc<Self>, peer_by_address: &HashMap<SocketAddr, Peer>) {
let mut requests = self.connection_requests.lock().await;
let mut new_requests = HashMap::with_capacity(requests.len());
for (address, request) in requests.iter() {
let address = *address;
let request = request.clone();
let is_connected = peer_by_address.contains_key(&address);
if is_connected && !request.is_permanent {
continue;
}
if !is_connected && request.next_attempt <= SystemTime::now() {
debug!("Connecting to peer request {}", address);
if self.p2p_adaptor.connect_peer(address.to_string()).await.is_err() {
debug!("Failed connecting to peer request {}", address);
if request.is_permanent {
const MAX_ACCOUNTABLE_ATTEMPTS: u32 = 4;
let retry_duration = Duration::from_secs(30u64 * 2u64.pow(min(request.attempts, MAX_ACCOUNTABLE_ATTEMPTS)));
debug!("Will retry peer request {} in {}", address, DurationString::from(retry_duration));
new_requests.insert(
address,
ConnectionRequest {
next_attempt: SystemTime::now() + retry_duration,
attempts: request.attempts + 1,
is_permanent: true,
},
);
}
} else if request.is_permanent {
new_requests.insert(address, ConnectionRequest::new(true));
}
} else {
new_requests.insert(address, request);
}
}
*requests = new_requests;
}
async fn handle_outbound_connections(self: &Arc<Self>, peer_by_address: &HashMap<SocketAddr, Peer>) {
let active_outbound: HashSet<kaspa_addressmanager::NetAddress> =
peer_by_address.values().filter(|peer| peer.is_outbound()).map(|peer| peer.net_address().into()).collect();
if active_outbound.len() >= self.outbound_target {
return;
}
let mut missing_connections = self.outbound_target - active_outbound.len();
let mut addr_iter = self.address_manager.lock().iterate_prioritized_random_addresses(active_outbound);
let mut progressing = true;
let mut connecting = true;
while connecting && missing_connections > 0 {
if self.shutdown_signal.trigger.is_triggered() {
return;
}
let mut addrs_to_connect = Vec::with_capacity(missing_connections);
let mut jobs = Vec::with_capacity(missing_connections);
for _ in 0..missing_connections {
let Some(net_addr) = addr_iter.next() else {
connecting = false;
break;
};
let socket_addr = SocketAddr::new(net_addr.ip.into(), net_addr.port).to_string();
debug!("Connecting to {}", &socket_addr);
addrs_to_connect.push(net_addr);
jobs.push(self.p2p_adaptor.connect_peer(socket_addr.clone()));
}
if progressing && !jobs.is_empty() {
info!(
"Connection manager: has {}/{} outgoing P2P connections, trying to obtain {} additional connections...",
self.outbound_target - missing_connections,
self.outbound_target,
jobs.len(),
);
progressing = false;
} else {
debug!(
"Connection manager: outgoing: {}/{} , connecting: {}, iterator: {}",
self.outbound_target - missing_connections,
self.outbound_target,
jobs.len(),
addr_iter.len(),
);
}
for (res, net_addr) in (join_all(jobs).await).into_iter().zip(addrs_to_connect) {
match res {
Ok(_) => {
self.address_manager.lock().mark_connection_success(net_addr);
missing_connections -= 1;
progressing = true;
}
Err(ConnectionError::ProtocolError(ProtocolError::PeerAlreadyExists(_))) => {
debug!("Failed connecting to {:?}, peer already exists", net_addr);
}
Err(err) => {
debug!("Failed connecting to {:?}, err: {}", net_addr, err);
self.address_manager.lock().mark_connection_failure(net_addr);
}
}
}
}
if missing_connections > 0 && !self.dns_seeders.is_empty() {
let cmgr = self.clone();
let _ = tokio::task::spawn_blocking(move || {
cmgr.dns_seed(missing_connections); })
.await;
}
}
async fn handle_inbound_connections(self: &Arc<Self>, peer_by_address: &HashMap<SocketAddr, Peer>) {
let active_inbound = peer_by_address.values().filter(|peer| !peer.is_outbound()).collect_vec();
let active_inbound_len = active_inbound.len();
if self.inbound_limit >= active_inbound_len {
return;
}
let mut futures = Vec::with_capacity(active_inbound_len - self.inbound_limit);
for peer in active_inbound.choose_multiple(&mut thread_rng(), active_inbound_len - self.inbound_limit) {
debug!("Disconnecting from {} because we're above the inbound limit", peer.net_address());
futures.push(self.p2p_adaptor.terminate(peer.key()));
}
join_all(futures).await;
}
fn dns_seed(self: &Arc<Self>, mut min_addresses_to_fetch: usize) {
let shuffled_dns_seeders = self.dns_seeders.choose_multiple(&mut thread_rng(), self.dns_seeders.len());
for &seeder in shuffled_dns_seeders {
info!("Querying DNS seeder {}", seeder);
let addrs = match (seeder, self.default_port).to_socket_addrs() {
Ok(addrs) => addrs,
Err(e) => {
warn!("Error connecting to DNS seeder {}: {}", seeder, e);
continue;
}
};
let addrs_len = addrs.len();
info!("Retrieved {} addresses from DNS seeder {}", addrs_len, seeder);
let mut amgr_lock = self.address_manager.lock();
for addr in addrs {
amgr_lock.add_address(NetAddress::new(addr.ip().into(), addr.port()));
}
if addrs_len >= min_addresses_to_fetch {
break;
} else {
min_addresses_to_fetch -= addrs_len;
}
}
}
pub async fn ban(&self, ip: IpAddr) {
if self.ip_has_permanent_connection(ip).await {
return;
}
for peer in self.p2p_adaptor.active_peers() {
if peer.net_address().ip() == ip {
self.p2p_adaptor.terminate(peer.key()).await;
}
}
self.address_manager.lock().ban(ip.into());
}
pub async fn is_banned(&self, address: &SocketAddr) -> bool {
!self.is_permanent(address).await && self.address_manager.lock().is_banned(address.ip().into())
}
pub async fn is_permanent(&self, address: &SocketAddr) -> bool {
self.connection_requests.lock().await.contains_key(address)
}
pub async fn ip_has_permanent_connection(&self, ip: IpAddr) -> bool {
self.connection_requests.lock().await.iter().any(|(address, request)| request.is_permanent && address.ip() == ip)
}
}