use chrono::prelude::{DateTime, Utc};
use chrono::Duration;
use p2p::{msg::PeerAddrs, P2PConfig};
use rand::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::{mpsc, Arc};
use std::{cmp, str, thread, time};
use crate::core::global;
use crate::core::pow::Difficulty;
use crate::p2p;
use crate::p2p::types::PeerAddr;
use crate::p2p::ChainAdapter;
use crate::util::StopState;
pub const MAINNET_DNS_SEEDS: &[&str] = &[
"mainnet-seed.grinnode.live", "grincoin.org", "main.gri.mw", "mainnet.grinffindor.org", "main-seed.grin.money", "mainnet.fountainoffairfortune.it", ];
pub const TESTNET_DNS_SEEDS: &[&str] = &[
"testnet.grincoin.org", "test.gri.mw", "testnet.grinffindor.org", "test-seed.grin.money", "testnet.fountainoffairfortune.it", ];
pub fn connect_and_monitor(
p2p_server: Arc<p2p::Server>,
config: P2PConfig,
stop_state: Arc<StopState>,
) -> std::io::Result<thread::JoinHandle<()>> {
thread::Builder::new()
.name("seed".to_string())
.spawn(move || {
let peers = p2p_server.peers.clone();
let (tx, rx) = mpsc::channel();
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
let mut prev = DateTime::<Utc>::MIN_UTC;
let mut prev_expire_check = DateTime::<Utc>::MIN_UTC;
let mut prev_ping = Utc::now();
let mut start_attempt = 0;
let mut connecting_history: HashMap<PeerAddr, DateTime<Utc>> = HashMap::new();
loop {
if stop_state.is_stopped() {
break;
}
if stop_state.is_paused() {
thread::sleep(time::Duration::from_secs(1));
continue;
}
if Utc::now() - prev_expire_check > Duration::hours(1) {
peers.remove_expired();
prev_expire_check = Utc::now();
}
if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) {
listen_for_addrs(
peers.clone(),
p2p_server.clone(),
&rx,
&mut connecting_history,
);
monitor_peers(peers.clone(), p2p_server.config.clone(), tx.clone());
prev = Utc::now();
start_attempt = cmp::min(6, start_attempt + 1);
}
if Utc::now() - prev_ping > Duration::seconds(10) {
let total_diff = peers.total_difficulty();
let total_height = peers.total_height();
if let (Ok(total_diff), Ok(total_height)) = (total_diff, total_height) {
peers.check_all(total_diff, total_height);
prev_ping = Utc::now();
} else {
error!("failed to get peers difficulty and/or height");
}
}
thread::sleep(time::Duration::from_secs(1));
}
})
}
fn monitor_peers(peers: Arc<p2p::Peers>, config: p2p::P2PConfig, tx: mpsc::Sender<PeerAddr>) {
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
config.clone(),
);
let mut total_count = 0;
let mut banned_count = 0;
let mut healthy = vec![];
let mut defuncts = vec![];
let mut unknown = vec![];
for x in peers.all_peer_data().into_iter() {
match x.flags {
p2p::State::Banned => {
let interval = Utc::now().timestamp() - x.last_banned;
if interval >= config.ban_window() {
if let Err(e) = peers.unban_peer(x.addr) {
error!("failed to unban peer {}: {:?}", x.addr, e);
}
debug!(
"monitor_peers: unbanned {} after {} seconds",
x.addr, interval
);
} else {
banned_count += 1;
}
}
p2p::State::Healthy => healthy.push(x),
p2p::State::Defunct => defuncts.push(x),
p2p::State::Unknown => unknown.push(x.addr),
}
total_count += 1;
}
let peers_iter = || peers.iter().connected();
let peers_count = peers_iter().count();
let max_diff = peers_iter().max_difficulty().unwrap_or(Difficulty::zero());
let most_work_count = peers_iter().with_difficulty(|x| x >= max_diff).count();
debug!(
"monitor_peers: on {}:{}, {} connected ({} most_work). \
all {} = {} healthy + {} banned + {} defunct + {} unknown",
config.host,
config.port,
peers_count,
most_work_count,
total_count,
healthy.len(),
banned_count,
defuncts.len(),
unknown.len()
);
if total_count == 0 {
connect_to_seeds_and_peers(peers.clone(), tx.clone(), config);
return;
}
let enough_outbound = peers.enough_outbound_peers();
if !enough_outbound {
let mut connected_peers: Vec<PeerAddr> = vec![];
for p in peers
.iter()
.with_capabilities(p2p::Capabilities::PEER_LIST)
.connected()
{
trace!(
"monitor_peers: {}:{} ask {} for more peers",
config.host,
config.port,
p.info.addr,
);
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
connected_peers.push(p.info.addr)
}
let default_peers = PeerAddrs::default();
let peers_preferred = config.peers_preferred.as_ref().unwrap_or(&default_peers);
for p in peers_preferred.peers.iter() {
if !connected_peers.is_empty() {
if !connected_peers.contains(&p) {
let _ = tx.send(*p);
}
} else {
let _ = tx.send(*p);
}
}
}
let mut new_peers = vec![];
let max_peer_attempts = 128;
let max_attempt_delay = Duration::hours(1).num_seconds();
for hp in healthy
.iter()
.filter(|p| {
peers.get_connected_peer(p.addr).is_none()
&& (!enough_outbound
|| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay)
})
.choose_multiple(&mut thread_rng(), max_peer_attempts / 2)
{
new_peers.push(&hp.addr);
}
let req_unk_count = cmp::max(
max_peer_attempts / 2 - new_peers.len() + max_peer_attempts / 4,
max_peer_attempts / 4,
);
for upa in unknown
.iter()
.choose_multiple(&mut thread_rng(), req_unk_count)
{
new_peers.push(upa);
}
debug!(
"monitor_peers: check {} healthy, {} unknown, {} defuncts",
cmp::min(
new_peers.len() as i32,
((new_peers.len() - req_unk_count) as i32).abs()
),
cmp::min(new_peers.len(), req_unk_count),
max_peer_attempts - new_peers.len()
);
for dp in defuncts
.iter()
.filter(|p| {
!enough_outbound || Utc::now().timestamp() - p.last_attempt >= max_attempt_delay
})
.choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len())
{
new_peers.push(&dp.addr);
}
for pa in new_peers {
if let Ok(false) = peers.is_known(*pa) {
tx.send(*pa).unwrap();
}
}
}
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
config: P2PConfig,
) {
let default_peers = PeerAddrs::default();
let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers);
if let Some(peers) = config.peers_allow {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
return;
}
if let Some(peers) = config.peers_preferred.as_ref() {
for addr in peers.difference(peers_deny.as_slice()) {
let _ = tx.send(addr);
}
}
let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128);
let peer_addrs = if peers.len() > 3 {
peers.iter().map(|p| p.addr).collect::<Vec<_>>()
} else {
seed_list(&config)
};
if peer_addrs.is_empty() {
warn!("No seeds were retrieved.");
}
for addr in peer_addrs {
if !peers_deny.as_slice().contains(&addr) {
let _ = tx.send(addr);
}
}
}
fn listen_for_addrs(
peers: Arc<p2p::Peers>,
p2p: Arc<p2p::Server>,
rx: &mpsc::Receiver<PeerAddr>,
connecting_history: &mut HashMap<PeerAddr, DateTime<Utc>>,
) {
let addrs: Vec<PeerAddr> = rx.try_iter().collect();
let connect_min_interval = 30;
let max_outbound_attempts = 128;
for addr in addrs.into_iter().take(max_outbound_attempts) {
let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) {
if *last_connect_time + Duration::seconds(connect_min_interval) > now {
debug!(
"peer_connect: ignore a duplicate request to {}. previous connecting time: {}",
addr,
last_connect_time.format("%H:%M:%S%.3f").to_string(),
);
continue;
} else if let Some(history) = connecting_history.get_mut(&addr) {
*history = now;
}
}
connecting_history.insert(addr, now);
let peers_c = peers.clone();
let p2p_c = p2p.clone();
thread::Builder::new()
.name("peer_connect".to_string())
.spawn(move || match p2p_c.connect(addr) {
Ok(p) => {
if peers_c.enough_outbound_peers() {
return;
}
if p.info.capabilities.contains(p2p::Capabilities::PEER_LIST) {
let _ = p.send_peer_request(p2p::Capabilities::PEER_LIST);
}
}
Err(_) => {
let _ = peers_c.update_state(addr, p2p::State::Defunct);
}
})
.expect("failed to launch peer_connect thread");
}
if connecting_history.len() > 100 {
let now = Utc::now();
let old: Vec<_> = connecting_history
.iter()
.filter(|&(_, t)| *t + Duration::seconds(connect_min_interval) < now)
.map(|(s, _)| *s)
.collect();
for addr in old {
connecting_history.remove(&addr);
}
}
}
fn seed_list(config: &P2PConfig) -> Vec<PeerAddr> {
match config.seeding_type {
p2p::Seeding::None => {
warn!("No seed configured, will stay solo until connected to");
vec![]
}
p2p::Seeding::List => match &config.seeds {
Some(seeds) => seeds.peers.clone(),
None => {
error!("Seeds must be configured for seeding type List");
vec![]
}
},
p2p::Seeding::DNSSeed => default_dns_seeds(),
_ => vec![],
}
}
fn default_dns_seeds() -> Vec<PeerAddr> {
let net_seeds = if global::is_testnet() {
TESTNET_DNS_SEEDS
} else {
MAINNET_DNS_SEEDS
};
resolve_dns_to_addrs(
&net_seeds
.iter()
.map(|s| {
s.to_string()
+ if global::is_testnet() {
":13414"
} else {
":3414"
}
})
.collect(),
)
}
pub fn resolve_dns_to_addrs(dns_records: &Vec<String>) -> Vec<PeerAddr> {
let mut addresses: Vec<PeerAddr> = vec![];
for dns in dns_records {
debug!("Retrieving addresses from dns {}", dns);
match dns.to_socket_addrs() {
Ok(addrs) => addresses.append(
&mut addrs
.map(PeerAddr)
.filter(|addr| !addresses.contains(addr))
.collect(),
),
Err(e) => debug!("Failed to resolve dns {:?} got error {:?}", dns, e),
};
}
debug!("Resolved addresses: {:?}", addresses);
addresses
}