use crate::metrics::METRICS;
use futures::{
future::{pending, Either},
FutureExt,
};
use std::{
net::SocketAddr,
pin::Pin,
sync::atomic::Ordering,
time::{Duration, Instant},
};
use tokio::time::{sleep, Sleep};
use tracing::info;
pub const DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT: Duration = Duration::from_secs(21600);
const NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID: usize = 2;
pub enum TimerFailure {
V4,
V6,
}
pub(crate) struct ConnectivityState {
duration_for_incoming_connections: Option<Duration>,
ipv4_incoming_wait_time: Option<Pin<Box<Sleep>>>,
ipv6_incoming_wait_time: Option<Pin<Box<Sleep>>>,
pub ipv4_next_connectivity_test: Instant,
pub ipv6_next_connectivity_test: Instant,
ipv4_incoming_count: usize,
ipv6_incoming_count: usize,
}
impl ConnectivityState {
pub fn new(duration_for_incoming_connections: Option<Duration>) -> Self {
ConnectivityState {
duration_for_incoming_connections,
ipv4_incoming_wait_time: None,
ipv6_incoming_wait_time: None,
ipv4_next_connectivity_test: Instant::now(),
ipv6_next_connectivity_test: Instant::now(),
ipv4_incoming_count: 0,
ipv6_incoming_count: 0,
}
}
pub fn should_count_ip_vote(&self, socket: &SocketAddr) -> bool {
if self.duration_for_incoming_connections.is_none() {
return true;
}
match socket {
SocketAddr::V4(_) => Instant::now() >= self.ipv4_next_connectivity_test,
SocketAddr::V6(_) => Instant::now() >= self.ipv6_next_connectivity_test,
}
}
pub fn enr_socket_update(&mut self, socket: &SocketAddr) {
if let Some(duration_to_wait) = self.duration_for_incoming_connections {
match socket {
SocketAddr::V4(_) => {
self.ipv4_incoming_count = 0;
self.ipv4_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait)))
}
SocketAddr::V6(_) => {
self.ipv6_incoming_count = 0;
self.ipv6_incoming_wait_time = Some(Box::pin(sleep(duration_to_wait)))
}
}
}
}
pub fn received_incoming_connection(&mut self, socket: &SocketAddr) {
match socket {
SocketAddr::V4(_) => {
if self.ipv4_incoming_wait_time.is_none() {
return;
}
self.ipv4_incoming_count += 1;
if self.ipv4_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID {
info!(ip_version = "v4", "We are contactable");
self.ipv4_incoming_wait_time = None;
METRICS.ipv4_contactable.store(true, Ordering::Relaxed);
}
}
SocketAddr::V6(_) => {
if self.ipv6_incoming_wait_time.is_none() {
return;
}
self.ipv6_incoming_count += 1;
if self.ipv6_incoming_count >= NUMBER_OF_INCOMING_CONNECTIONS_REQUIRED_TO_BE_VALID {
info!(ip_version = "v6", "We are contactable");
self.ipv6_incoming_wait_time = None;
METRICS.ipv6_contactable.store(true, Ordering::Relaxed);
}
}
}
}
pub async fn poll(&mut self) -> TimerFailure {
let ipv4_fired = match (
self.ipv4_incoming_wait_time.as_mut(),
self.ipv6_incoming_wait_time.as_mut(),
) {
(Some(ipv4_sleep), Some(ipv6_sleep)) => {
match futures::future::select(ipv4_sleep, ipv6_sleep).await {
Either::Left(_) => true,
Either::Right(_) => false, }
}
(Some(ipv4_sleep), None) => ipv4_sleep.map(|_| true).await,
(None, Some(ipv6_sleep)) => ipv6_sleep.map(|_| false).await,
(None, None) => pending().await,
};
if ipv4_fired {
self.ipv4_next_connectivity_test =
Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT;
self.ipv4_incoming_wait_time = None;
METRICS.ipv4_contactable.store(false, Ordering::Relaxed);
TimerFailure::V4
} else {
self.ipv6_next_connectivity_test =
Instant::now() + DURATION_UNTIL_NEXT_CONNECTIVITY_ATTEMPT;
self.ipv6_incoming_wait_time = None;
METRICS.ipv6_contactable.store(false, Ordering::Relaxed);
TimerFailure::V6
}
}
}