use crate::locators::BlockLocators;
use snarkos_node_router::Router;
use snarkvm::prelude::Network;
#[cfg(feature = "locktick")]
use locktick::parking_lot::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::Mutex;
use std::{
collections::BTreeMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{sync::Notify, time::timeout};
struct PingInner<N: Network> {
next_ping: BTreeMap<Instant, SocketAddr>,
block_locators: Option<BlockLocators<N>>,
}
pub struct Ping<N: Network> {
router: Router<N>,
inner: Arc<Mutex<PingInner<N>>>,
notify: Arc<Notify>,
}
impl<N: Network> PingInner<N> {
fn new(block_locators: Option<BlockLocators<N>>) -> Self {
Self { block_locators, next_ping: Default::default() }
}
}
impl<N: Network> Ping<N> {
const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
let notify = Arc::new(Notify::default());
let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
{
let inner = inner.clone();
let router = router.clone();
let notify = notify.clone();
tokio::spawn(async move {
Self::ping_task(&inner, &router, ¬ify).await;
});
}
Self { inner, router, notify }
}
pub fn new_nosync(router: Router<N>) -> Self {
let notify = Arc::new(Notify::default());
let inner = Arc::new(Mutex::new(PingInner::new(None)));
{
let inner = inner.clone();
let router = router.clone();
let notify = notify.clone();
tokio::spawn(async move {
Self::ping_task(&inner, &router, ¬ify).await;
});
}
Self { inner, router, notify }
}
pub fn on_pong_received(&self, peer_ip: SocketAddr) {
let now = Instant::now();
let mut inner = self.inner.lock();
inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
}
pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
let locators = self.inner.lock().block_locators.clone();
if !self.router.send_ping(peer_ip, locators) {
warn!("Peer {peer_ip} connected and immediately disconnected?");
}
}
pub fn update_block_locators(&self, locators: BlockLocators<N>) {
self.inner.lock().block_locators = Some(locators);
self.notify.notify_one();
}
async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
let mut new_block = false;
loop {
let sleep_time = {
let mut inner = inner.lock();
let now = Instant::now();
if new_block {
Self::ping_all_peers(&mut inner, router);
new_block = false;
} else {
Self::ping_expired_peers(now, &mut inner, router);
}
if let Some((time, _)) = inner.next_ping.first_key_value() {
time.saturating_duration_since(now)
} else {
Self::MAX_PING_INTERVAL
}
};
if timeout(sleep_time, notify.notified()).await.is_ok() {
new_block = true;
}
}
}
fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
loop {
let peer_ip = {
let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
return;
};
if *time > now {
return;
}
*peer_ip
};
let locators = inner.block_locators.clone();
let success = router.send_ping(peer_ip, locators.clone());
inner.next_ping.pop_first();
if !success {
trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
}
}
}
fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
inner.next_ping.clear();
for peer_ip in peers {
let locators = inner.block_locators.clone();
let success = router.send_ping(peer_ip, locators);
if !success {
trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
}
}
}
}