use std::{
collections::HashMap,
fmt::Debug,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};
use addrman::Record;
use bitcoin::{
key::rand,
p2p::{address::AddrV2, ServiceFlags},
FeeRate, Network,
};
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use tokio::{
sync::{
mpsc::{self, Sender},
Mutex,
},
task::JoinHandle,
};
use crate::{
broadcaster::BroadcastQueue,
default_port_from_network,
network::{dns::bootstrap_dns, error::PeerError, peer::Peer, PeerId, PeerTimeoutConfig},
BlockType, Dialog, TrustedPeer, TrustedPeerInner,
};
use super::{AddressBook, ConnectionType, MainThreadMessage, PeerThreadMessage};
const LOCAL_HOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
type Whitelist = Vec<TrustedPeer>;
#[derive(Debug)]
pub(crate) struct ManagedPeer {
record: Record,
broadcast_min: FeeRate,
ptx: Sender<MainThreadMessage>,
handle: JoinHandle<Result<(), PeerError>>,
}
#[derive(Debug)]
pub(crate) struct PeerMap {
pub(crate) tx_queue: Arc<Mutex<BroadcastQueue>>,
pub(crate) whitelist_only: bool,
current_id: PeerId,
network: Network,
block_type: BlockType,
mtx: Sender<PeerThreadMessage>,
map: HashMap<PeerId, ManagedPeer>,
db: Arc<Mutex<AddressBook>>,
connector: ConnectionType,
whitelist: Whitelist,
dialog: Arc<Dialog>,
timeout_config: PeerTimeoutConfig,
}
impl PeerMap {
#[allow(clippy::too_many_arguments)]
pub fn new(
mtx: Sender<PeerThreadMessage>,
network: Network,
block_type: BlockType,
whitelist: Whitelist,
whitelist_only: bool,
dialog: Arc<Dialog>,
connection_type: ConnectionType,
timeout_config: PeerTimeoutConfig,
) -> Self {
Self {
tx_queue: Arc::new(Mutex::new(BroadcastQueue::new())),
whitelist_only,
current_id: PeerId(0),
network,
block_type,
mtx,
map: HashMap::new(),
db: Arc::new(Mutex::new(AddressBook::new())),
connector: connection_type,
whitelist,
dialog,
timeout_config,
}
}
pub async fn clean(&mut self) {
self.map.retain(|_, peer| !peer.handle.is_finished());
}
pub fn live(&self) -> usize {
self.map
.values()
.filter(|peer| !peer.handle.is_finished())
.count()
}
pub fn add_trusted_peer(&mut self, peer: TrustedPeer) {
self.whitelist.push(peer);
}
pub async fn dispatch(&mut self, loaded_peer: Record) -> Result<(), PeerError> {
let (ptx, prx) = mpsc::channel::<MainThreadMessage>(32);
let (addr, port) = loaded_peer.network_addr();
if !self.connector.can_connect(&addr) {
let mut db_lock = self.db.lock().await;
db_lock.failed(&loaded_peer);
return Err(PeerError::UnreachableSocketAddr);
}
crate::debug!(format!("Connecting to {:?}:{}", addr, port));
self.current_id.increment();
let mut peer = Peer::new(
self.current_id,
loaded_peer.clone(),
self.network,
self.block_type,
self.mtx.clone(),
prx,
Arc::clone(&self.dialog),
Arc::clone(&self.db),
self.timeout_config,
Arc::clone(&self.tx_queue),
);
let connection = self
.connector
.connect(addr, port, self.timeout_config.handshake_timeout)
.await;
let connection = match connection {
Ok(conn) => conn,
Err(e) => {
let mut db_lock = self.db.lock().await;
db_lock.failed(&loaded_peer);
return Err(e);
}
};
let is_proxy = self.connector.is_proxy();
let handle = tokio::spawn(async move { peer.run(connection, is_proxy).await });
self.map.insert(
self.current_id,
ManagedPeer {
record: loaded_peer,
broadcast_min: FeeRate::BROADCAST_MIN,
ptx,
handle,
},
);
Ok(())
}
pub fn set_broadcast_min(&mut self, nonce: PeerId, fee_rate: FeeRate) {
if let Some(peer) = self.map.get_mut(&nonce) {
peer.broadcast_min = fee_rate;
}
}
pub fn set_services(&mut self, nonce: PeerId, flags: ServiceFlags) {
if let Some(peer) = self.map.get_mut(&nonce) {
peer.record.update_service_flags(flags);
}
}
pub fn broadcast_min(&self) -> FeeRate {
self.map
.values()
.map(|peer| peer.broadcast_min)
.max()
.unwrap_or(FeeRate::BROADCAST_MIN)
}
pub fn peer_info(&self) -> Vec<(AddrV2, ServiceFlags)> {
self.map
.values()
.map(|peer| (peer.record.network_addr().0, peer.record.service_flags()))
.collect()
}
pub async fn send_message(&self, nonce: PeerId, message: MainThreadMessage) {
if let Some(peer) = self.map.get(&nonce) {
let _ = peer.ptx.send(message).await;
}
}
pub async fn broadcast(&self, message: MainThreadMessage) -> bool {
let active = self.map.values().filter(|peer| !peer.handle.is_finished());
let mut sends = Vec::new();
for peer in active {
let res = peer.ptx.send(message.clone()).await;
sends.push(res.is_ok());
}
sends.into_iter().any(|res| res)
}
pub async fn send_random(&self, message: MainThreadMessage) -> bool {
let mut rng = StdRng::from_entropy();
if let Some((_, peer)) = self.map.iter().choose(&mut rng) {
let res = peer.ptx.send(message).await;
return res.is_ok();
}
false
}
pub async fn next_peer(&mut self) -> Option<Record> {
while let Some(peer) = self.whitelist.pop() {
let port = peer
.port
.unwrap_or(default_port_from_network(&self.network));
let addr = match peer.address {
TrustedPeerInner::Addr(addr) => addr,
TrustedPeerInner::Hostname(host) => {
crate::debug!(format!("Resolving hostname {host}:{port}"));
match tokio::net::lookup_host((host.as_str(), port)).await {
Ok(iter) => {
let resolved: Vec<AddrV2> = iter
.map(|sa| match sa.ip() {
IpAddr::V4(ip) => AddrV2::Ipv4(ip),
IpAddr::V6(ip) => AddrV2::Ipv6(ip),
})
.collect();
if resolved.is_empty() {
crate::debug!(format!(
"Hostname {host} resolved to no addresses, skipping"
));
continue;
}
crate::debug!(format!(
"Resolved {host} to {} address(es)",
resolved.len()
));
for resolved_addr in resolved.into_iter().rev() {
self.whitelist.push(TrustedPeer {
address: TrustedPeerInner::Addr(resolved_addr),
port: Some(port),
known_services: peer.known_services,
});
}
continue;
}
Err(_) => {
crate::debug!(format!("Failed to resolve hostname {host}"));
continue;
}
}
}
};
crate::debug!("Using a configured peer");
return Some(Record::new(addr, port, peer.known_services, &LOCAL_HOST));
}
if self.whitelist_only {
return None;
}
let mut db_lock = self.db.lock().await;
if db_lock.is_empty() {
crate::debug!("Bootstrapping peers with DNS");
let new_peers = bootstrap_dns(self.network)
.await
.into_iter()
.map(|ip| match ip {
IpAddr::V4(ip) => AddrV2::Ipv4(ip),
IpAddr::V6(ip) => AddrV2::Ipv6(ip),
})
.collect::<Vec<AddrV2>>();
crate::debug!(format!("Adding {} sourced from DNS", new_peers.len()));
let addr_iter = new_peers
.into_iter()
.map(|ip| bitcoin::p2p::address::AddrV2Message {
time: 0,
services: ServiceFlags::NONE,
addr: ip,
port: default_port_from_network(&self.network),
});
let source = AddrV2::Ipv4(Ipv4Addr::new(1, 1, 1, 1));
db_lock.add_gossiped(addr_iter, &source);
}
db_lock.select()
}
pub async fn tried(&mut self, nonce: PeerId) {
if let Some(peer) = self.map.get(&nonce) {
let mut db = self.db.lock().await;
db.tried(&peer.record);
}
}
pub async fn ban(&mut self, nonce: PeerId) {
if let Some(peer) = self.map.get(&nonce) {
let mut db = self.db.lock().await;
db.ban(&peer.record);
}
}
}