use std::collections::VecDeque;
use std::net;
use nakamoto_common::block::time::{Clock, LocalDuration, LocalTime};
use nakamoto_common::collections::HashMap;
use crate::fsm::PeerId;
use super::{
output::{Disconnect, SetTimer, Wire},
DisconnectReason,
};
pub const PING_INTERVAL: LocalDuration = LocalDuration::from_mins(2);
pub const PING_TIMEOUT: LocalDuration = LocalDuration::from_secs(30);
const MAX_RECORDED_LATENCIES: usize = 64;
#[derive(Clone, Debug)]
pub enum Event {}
impl std::fmt::Display for Event {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
#[derive(Debug)]
enum State {
AwaitingPong { nonce: u64, since: LocalTime },
Idle { since: LocalTime },
}
#[derive(Debug)]
struct Peer {
address: net::SocketAddr,
state: State,
latencies: VecDeque<LocalDuration>,
}
impl Peer {
#[allow(dead_code)]
fn latency(&self) -> LocalDuration {
let sum: LocalDuration = self.latencies.iter().sum();
sum / self.latencies.len() as u32
}
fn record_latency(&mut self, sample: LocalDuration) {
self.latencies.push_front(sample);
self.latencies.truncate(MAX_RECORDED_LATENCIES);
}
}
#[derive(Debug)]
pub struct PingManager<U, C> {
peers: HashMap<PeerId, Peer>,
ping_timeout: LocalDuration,
rng: fastrand::Rng,
upstream: U,
clock: C,
}
impl<U: Wire<Event> + SetTimer + Disconnect, C: Clock> PingManager<U, C> {
pub fn new(ping_timeout: LocalDuration, rng: fastrand::Rng, upstream: U, clock: C) -> Self {
let peers = HashMap::with_hasher(rng.clone().into());
Self {
peers,
ping_timeout,
rng,
upstream,
clock,
}
}
pub fn peer_negotiated(&mut self, address: PeerId) {
let nonce = self.rng.u64(..);
let now = self.clock.local_time();
self.upstream.ping(address, nonce);
self.peers.insert(
address,
Peer {
address,
state: State::AwaitingPong { nonce, since: now },
latencies: VecDeque::new(),
},
);
}
pub fn peer_disconnected(&mut self, addr: &PeerId) {
self.peers.remove(addr);
}
pub fn received_wake(&mut self) {
let now = self.clock.local_time();
for peer in self.peers.values_mut() {
match peer.state {
State::AwaitingPong { since, .. } => {
if now - since >= self.ping_timeout {
self.upstream
.disconnect(peer.address, DisconnectReason::PeerTimeout("ping"));
}
}
State::Idle { since } => {
if now - since >= PING_INTERVAL {
let nonce = self.rng.u64(..);
self.upstream
.ping(peer.address, nonce)
.set_timer(self.ping_timeout)
.set_timer(PING_INTERVAL);
peer.state = State::AwaitingPong { nonce, since: now };
}
}
}
}
}
pub fn received_ping(&mut self, addr: PeerId, nonce: u64) -> bool {
if self.peers.contains_key(&addr) {
self.upstream.pong(addr, nonce);
return true;
}
false
}
pub fn received_pong(&mut self, addr: PeerId, nonce: u64, now: LocalTime) -> bool {
if let Some(peer) = self.peers.get_mut(&addr) {
match peer.state {
State::AwaitingPong {
nonce: last_nonce,
since,
} => {
if nonce == last_nonce {
peer.record_latency(now - since);
peer.state = State::Idle { since: now };
return true;
}
}
State::Idle { .. } => {}
}
}
false
}
}