use std::collections::{BTreeMap, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use parking_lot::Mutex;
use smallvec::SmallVec;
use tokio::sync::mpsc;
use tracing::debug;
use crate::peer_state::PeerSource;
const MAX_BACKOFF_SECS: u64 = 3600;
#[allow(dead_code)]
const EVICTION_BAN_DURATION: Duration = Duration::from_mins(30);
#[allow(dead_code)]
const EVICTION_BAN_SET_CAP_FALLBACK: usize = 1024;
const BASE_BACKOFF_SECS: u64 = 10;
const BACKOFF_FACTOR: u64 = 6;
const RETRY_WINDOW_SECS: u64 = 86400;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PeerLifecycle {
Queued,
Connecting,
Live,
Dead,
}
#[derive(Debug)]
pub(crate) struct PeerEntry {
pub state: PeerLifecycle,
pub source: PeerSource,
pub backoff_attempt: u32,
pub first_failure_at: Option<Instant>,
pub retry_at: Option<Instant>,
pub connecting_since: Option<Instant>,
pub tcp_connected_at: Option<Instant>,
}
#[derive(Debug, Default)]
pub(crate) struct PeerPipelineStats {
pub known: AtomicU32,
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub dead: AtomicU32,
}
impl PeerPipelineStats {
pub fn snapshot(&self) -> PeerPipelineSnapshot {
PeerPipelineSnapshot {
known: self.known.load(Ordering::Relaxed),
queued: self.queued.load(Ordering::Relaxed),
connecting: self.connecting.load(Ordering::Relaxed),
live: self.live.load(Ordering::Relaxed),
dead: self.dead.load(Ordering::Relaxed),
}
}
fn inc(&self, state: PeerLifecycle) {
self.counter(state).fetch_add(1, Ordering::Relaxed);
}
fn dec(&self, state: PeerLifecycle) {
self.counter(state).fetch_sub(1, Ordering::Relaxed);
}
fn counter(&self, state: PeerLifecycle) -> &AtomicU32 {
match state {
PeerLifecycle::Queued => &self.queued,
PeerLifecycle::Connecting => &self.connecting,
PeerLifecycle::Live => &self.live,
PeerLifecycle::Dead => &self.dead,
}
}
}
pub use irontide_session_types::PeerPipelineSnapshot;
pub(crate) struct PeerStates {
states: DashMap<SocketAddr, PeerEntry>,
pub stats: PeerPipelineStats,
queue_tx: mpsc::UnboundedSender<SocketAddr>,
eviction_bans: DashMap<SocketAddr, Instant>,
eviction_ban_order: Mutex<VecDeque<SocketAddr>>,
eviction_ban_cap: usize,
eviction_ban_duration: Duration,
connecting_since_index: Mutex<BTreeMap<Instant, SmallVec<[SocketAddr; 2]>>>,
}
impl PeerStates {
#[allow(dead_code)]
pub fn new(queue_tx: mpsc::UnboundedSender<SocketAddr>) -> Self {
Self::new_with_config(
queue_tx,
EVICTION_BAN_SET_CAP_FALLBACK,
EVICTION_BAN_DURATION,
)
}
pub fn new_with_config(
queue_tx: mpsc::UnboundedSender<SocketAddr>,
eviction_ban_cap: usize,
eviction_ban_duration: Duration,
) -> Self {
Self {
states: DashMap::new(),
stats: PeerPipelineStats::default(),
queue_tx,
eviction_bans: DashMap::new(),
eviction_ban_order: Mutex::new(VecDeque::new()),
eviction_ban_cap: eviction_ban_cap.max(1),
eviction_ban_duration,
connecting_since_index: Mutex::new(BTreeMap::new()),
}
}
fn idx_insert(&self, ts: Instant, addr: SocketAddr) {
self.connecting_since_index
.lock()
.entry(ts)
.or_default()
.push(addr);
}
fn idx_remove(&self, ts: Instant, addr: SocketAddr) {
let mut idx = self.connecting_since_index.lock();
if let Some(bucket) = idx.get_mut(&ts) {
bucket.retain(|a| *a != addr);
if bucket.is_empty() {
idx.remove(&ts);
}
}
}
pub fn add_if_not_seen(&self, addr: SocketAddr, source: PeerSource) -> bool {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => false,
Entry::Vacant(vacant) => {
vacant.insert(PeerEntry {
state: PeerLifecycle::Queued,
source,
backoff_attempt: 0,
first_failure_at: None,
retry_at: None,
connecting_since: None,
tcp_connected_at: None,
});
self.stats.known.fetch_add(1, Ordering::Relaxed);
self.stats.inc(PeerLifecycle::Queued);
let _ = self.queue_tx.send(addr);
true
}
}
}
pub fn mark_connecting(&self, addr: SocketAddr) -> bool {
let idx_insert_ts = {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Queued
{
self.stats.dec(PeerLifecycle::Queued);
entry.state = PeerLifecycle::Connecting;
let now = Instant::now();
entry.connecting_since = Some(now);
entry.tcp_connected_at = None;
self.stats.inc(PeerLifecycle::Connecting);
Some(now)
} else {
None
}
};
if let Some(ts) = idx_insert_ts {
self.idx_insert(ts, addr);
return true;
}
false
}
pub fn set_tcp_connected(&self, addr: SocketAddr) {
let idx_remove_ts = {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Connecting
{
let ts = entry.connecting_since;
entry.tcp_connected_at = Some(Instant::now());
ts
} else {
None
}
};
if let Some(ts) = idx_remove_ts {
self.idx_remove(ts, addr);
}
}
pub fn mark_live(&self, addr: SocketAddr) {
let idx_remove_ts = {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Connecting
{
self.stats.dec(PeerLifecycle::Connecting);
entry.state = PeerLifecycle::Live;
entry.backoff_attempt = 0;
entry.first_failure_at = None;
entry.retry_at = None;
self.stats.inc(PeerLifecycle::Live);
entry.connecting_since
} else {
None
}
};
if let Some(ts) = idx_remove_ts {
self.idx_remove(ts, addr);
}
}
pub fn mark_dead(&self, addr: SocketAddr) -> Option<Duration> {
let mut entry = self.states.get_mut(&addr)?;
let old_state = entry.state;
if old_state != PeerLifecycle::Live && old_state != PeerLifecycle::Connecting {
return None;
}
let now = Instant::now();
let first_failure = entry.first_failure_at.unwrap_or(now);
let idx_remove_ts = if old_state == PeerLifecycle::Connecting {
entry.connecting_since
} else {
None
};
if first_failure.elapsed() > Duration::from_secs(RETRY_WINDOW_SECS) {
self.stats.dec(old_state);
self.stats.known.fetch_sub(1, Ordering::Relaxed);
let addr_to_remove = *entry.key();
drop(entry);
if let Some(ts) = idx_remove_ts {
self.idx_remove(ts, addr_to_remove);
}
self.states.remove(&addr_to_remove);
debug!(%addr_to_remove, "peer removed: 24hr retry window exhausted");
return None;
}
let attempt = entry.backoff_attempt;
let backoff_secs = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
self.stats.dec(old_state);
entry.state = PeerLifecycle::Dead;
entry.backoff_attempt = attempt.saturating_add(1);
entry.first_failure_at = Some(first_failure);
entry.retry_at = Some(now + Duration::from_secs(backoff_secs));
self.stats.inc(PeerLifecycle::Dead);
drop(entry);
if let Some(ts) = idx_remove_ts {
self.idx_remove(ts, addr);
}
Some(Duration::from_secs(backoff_secs))
}
pub fn mark_queued_for_retry(&self, addr: SocketAddr) -> bool {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Dead
{
self.stats.dec(PeerLifecycle::Dead);
entry.state = PeerLifecycle::Queued;
entry.retry_at = None;
self.stats.inc(PeerLifecycle::Queued);
let _ = self.queue_tx.send(addr);
return true;
}
false
}
pub fn is_live(&self, addr: &SocketAddr) -> bool {
self.states
.get(addr)
.is_some_and(|e| e.state == PeerLifecycle::Live)
}
#[allow(dead_code)] pub fn is_active(&self, addr: &SocketAddr) -> bool {
self.states
.get(addr)
.is_some_and(|e| e.state != PeerLifecycle::Dead)
}
pub fn source(&self, addr: &SocketAddr) -> Option<PeerSource> {
self.states.get(addr).map(|e| e.source)
}
#[allow(dead_code)] pub fn soft_reap_candidates(&self, soft_timeout: Duration) -> Vec<SocketAddr> {
let mut out = Vec::new();
self.soft_reap_candidates_into(soft_timeout, &mut out);
out
}
pub fn soft_reap_candidates_into(&self, soft_timeout: Duration, out: &mut Vec<SocketAddr>) {
out.clear();
let now = Instant::now();
let Some(cutoff) = now.checked_sub(soft_timeout) else {
return;
};
let idx = self.connecting_since_index.lock();
for (_ts, bucket) in idx.range(..=cutoff) {
for addr in bucket.iter().copied() {
#[cfg(debug_assertions)]
if let Some(entry) = self.states.get(&addr) {
debug_assert!(
entry.state == PeerLifecycle::Connecting
&& entry.tcp_connected_at.is_none(),
"soft_reap index/DashMap invariant violated for {addr:?}: \
state={:?} tcp_connected_at={:?}",
entry.state,
entry.tcp_connected_at,
);
}
out.push(addr);
}
}
}
#[cfg(test)]
pub(crate) fn test_backdate_connecting_since(&self, addr: SocketAddr, new_ts: Instant) {
let old_ts = {
let Some(mut entry) = self.states.get_mut(&addr) else {
return;
};
let old = entry.connecting_since;
entry.connecting_since = Some(new_ts);
old
};
if let Some(old) = old_ts {
self.idx_remove(old, addr);
}
self.idx_insert(new_ts, addr);
}
#[allow(dead_code)] pub fn len(&self) -> usize {
self.states.len()
}
pub fn add_eviction_ban(&self, addr: SocketAddr) {
let mut order = self.eviction_ban_order.lock();
while self.eviction_bans.len() >= self.eviction_ban_cap {
let Some(victim) = order.pop_front() else {
break;
};
self.eviction_bans.remove(&victim);
}
if self.eviction_bans.insert(addr, Instant::now()).is_none() {
order.push_back(addr);
}
}
pub fn is_eviction_banned(&self, addr: &SocketAddr) -> bool {
let Some(entry) = self.eviction_bans.get(addr) else {
return false;
};
let banned_at = *entry;
drop(entry); if banned_at.elapsed() >= self.eviction_ban_duration {
self.eviction_bans.remove(addr);
false
} else {
true
}
}
#[cfg(test)]
pub fn eviction_ban_count(&self) -> usize {
self.eviction_bans.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)), port)
}
fn test_addr_ip(last_octet: u8, port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, last_octet)), port)
}
fn make_peer_states() -> (PeerStates, mpsc::UnboundedReceiver<SocketAddr>) {
let (tx, rx) = mpsc::unbounded_channel();
(PeerStates::new(tx), rx)
}
#[test]
fn add_if_not_seen_returns_true_for_new_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(ps.add_if_not_seen(addr, PeerSource::Tracker));
}
#[test]
fn add_if_not_seen_returns_false_for_duplicate() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(ps.add_if_not_seen(addr, PeerSource::Tracker));
assert!(!ps.add_if_not_seen(addr, PeerSource::Dht));
}
#[test]
fn add_if_not_seen_sets_queued_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
assert_eq!(entry.source, PeerSource::Tracker);
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
assert!(entry.retry_at.is_none());
}
#[test]
fn counters_increment_on_add() {
let (ps, _rx) = make_peer_states();
let snap_before = ps.stats.snapshot();
assert_eq!(snap_before.known, 0);
assert_eq!(snap_before.queued, 0);
ps.add_if_not_seen(test_addr(6881), PeerSource::Tracker);
ps.add_if_not_seen(test_addr_ip(2, 6882), PeerSource::Dht);
let snap = ps.stats.snapshot();
assert_eq!(snap.known, 2);
assert_eq!(snap.queued, 2);
assert_eq!(snap.connecting, 0);
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 0);
}
#[test]
fn mark_connecting_transitions_from_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.mark_connecting(addr));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Connecting);
let snap = ps.stats.snapshot();
assert_eq!(snap.queued, 0);
assert_eq!(snap.connecting, 1);
}
#[test]
fn mark_connecting_rejects_non_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
assert!(!ps.mark_connecting(addr));
}
#[test]
fn mark_connecting_returns_false_for_unknown_addr() {
let (ps, _rx) = make_peer_states();
assert!(!ps.mark_connecting(test_addr(9999)));
}
#[test]
fn mark_live_transitions_from_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Live);
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
let snap = ps.stats.snapshot();
assert_eq!(snap.connecting, 0);
assert_eq!(snap.live, 1);
}
#[test]
fn mark_live_resets_backoff_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
if let Some(mut entry) = ps.states.get_mut(&addr) {
entry.backoff_attempt = 3;
entry.first_failure_at = Some(Instant::now());
}
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
assert!(entry.retry_at.is_none());
}
#[test]
fn mark_live_no_op_if_not_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
}
#[test]
fn mark_dead_from_live_returns_backoff_duration() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let backoff = ps.mark_dead(addr);
assert_eq!(backoff, Some(Duration::from_secs(10)));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Dead);
assert_eq!(entry.backoff_attempt, 1);
assert!(entry.first_failure_at.is_some());
assert!(entry.retry_at.is_some());
let snap = ps.stats.snapshot();
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 1);
}
#[test]
fn mark_dead_from_connecting_returns_backoff_duration() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let backoff = ps.mark_dead(addr);
assert_eq!(backoff, Some(Duration::from_secs(10)));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Dead);
let snap = ps.stats.snapshot();
assert_eq!(snap.connecting, 0);
assert_eq!(snap.dead, 1);
}
#[test]
fn mark_dead_returns_none_for_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.mark_dead(addr).is_none());
}
#[test]
fn mark_dead_returns_none_for_unknown() {
let (ps, _rx) = make_peer_states();
assert!(ps.mark_dead(test_addr(9999)).is_none());
}
#[test]
fn backoff_increases_exponentially() {
let expected_secs = [10u64, 60, 360, 2160, 3600];
for (attempt, &expected) in expected_secs.iter().enumerate() {
#[allow(clippy::cast_possible_truncation)]
let attempt = attempt as u32;
let got = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
assert_eq!(
got, expected,
"attempt {attempt}: expected {expected}s, got {got}s"
);
}
for attempt in 5u32..=10 {
let got = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
assert_eq!(
got, MAX_BACKOFF_SECS,
"attempt {attempt} should cap at {MAX_BACKOFF_SECS}s"
);
}
}
#[test]
fn mark_dead_preserves_first_failure_at_across_retries() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let _ = ps.mark_dead(addr);
let first_failure = ps
.states
.get(&addr)
.expect("entry should exist")
.first_failure_at
.expect("first_failure_at should be set");
ps.mark_queued_for_retry(addr);
ps.mark_connecting(addr);
let _ = ps.mark_dead(addr);
let second_failure = ps
.states
.get(&addr)
.expect("entry should exist")
.first_failure_at
.expect("first_failure_at should still be set");
assert_eq!(
first_failure, second_failure,
"first_failure_at must not change across retries without a successful connection"
);
}
#[test]
fn mark_dead_removes_peer_after_24hr_window() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
if let Some(mut entry) = ps.states.get_mut(&addr) {
let too_old = Instant::now().checked_sub(Duration::from_secs(RETRY_WINDOW_SECS + 1));
if let Some(old_instant) = too_old {
entry.first_failure_at = Some(old_instant);
} else {
return;
}
}
let result = ps.mark_dead(addr);
assert!(result.is_none(), "peer past 24hr window should return None");
assert!(
ps.states.get(&addr).is_none(),
"peer should be removed from the map"
);
let snap = ps.stats.snapshot();
assert_eq!(snap.known, 0, "known counter should be decremented");
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 0);
}
#[test]
fn mark_queued_for_retry_transitions_dead_to_queued() {
let (ps, mut rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let _ = ps.mark_dead(addr);
assert!(ps.mark_queued_for_retry(addr));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
assert!(entry.retry_at.is_none(), "retry_at should be cleared");
let snap = ps.stats.snapshot();
assert_eq!(snap.dead, 0);
assert_eq!(snap.queued, 1);
let queued_addr = rx.try_recv().expect("should have received re-queued addr");
assert_eq!(queued_addr, addr);
}
#[test]
fn mark_queued_for_retry_rejects_non_dead() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(!ps.mark_queued_for_retry(addr));
}
#[test]
fn mark_queued_for_retry_returns_false_for_unknown() {
let (ps, _rx) = make_peer_states();
assert!(!ps.mark_queued_for_retry(test_addr(9999)));
}
#[test]
fn is_live_reflects_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(!ps.is_live(&addr));
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(!ps.is_live(&addr));
ps.mark_connecting(addr);
assert!(!ps.is_live(&addr));
ps.mark_live(addr);
assert!(ps.is_live(&addr));
}
#[test]
fn is_active_reflects_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(!ps.is_active(&addr));
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.is_active(&addr), "Queued peer should be active");
ps.mark_connecting(addr);
assert!(ps.is_active(&addr), "Connecting peer should be active");
ps.mark_live(addr);
assert!(ps.is_active(&addr), "Live peer should be active");
let _ = ps.mark_dead(addr);
assert!(!ps.is_active(&addr), "Dead peer should not be active");
}
#[test]
fn len_tracks_entry_count() {
let (ps, _rx) = make_peer_states();
assert_eq!(ps.len(), 0);
ps.add_if_not_seen(test_addr(6881), PeerSource::Tracker);
assert_eq!(ps.len(), 1);
ps.add_if_not_seen(test_addr_ip(2, 6882), PeerSource::Dht);
assert_eq!(ps.len(), 2);
ps.add_if_not_seen(test_addr(6881), PeerSource::Pex);
assert_eq!(ps.len(), 2);
}
#[test]
fn full_lifecycle_counters_are_consistent() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 1, 0, 0, 0)
);
ps.mark_connecting(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 1, 0, 0)
);
ps.mark_live(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 0, 1, 0)
);
let _ = ps.mark_dead(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 0, 0, 1)
);
ps.mark_queued_for_retry(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 1, 0, 0, 0)
);
}
#[test]
fn multiple_peers_tracked_independently() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr(6881);
let addr2 = test_addr_ip(2, 6882);
let addr3 = test_addr_ip(3, 6883);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Dht);
ps.add_if_not_seen(addr3, PeerSource::Pex);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
ps.mark_live(addr1);
let s = ps.stats.snapshot();
assert_eq!(s.known, 3);
assert_eq!(s.queued, 1); assert_eq!(s.connecting, 1); assert_eq!(s.live, 1); assert_eq!(s.dead, 0);
}
#[test]
fn mark_dead_increments_backoff_attempt() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let backoff1 = ps.mark_dead(addr);
assert_eq!(backoff1, Some(Duration::from_secs(10)));
let attempt_after_first = ps.states.get(&addr).expect("entry exists").backoff_attempt;
assert_eq!(attempt_after_first, 1);
ps.mark_queued_for_retry(addr);
ps.mark_connecting(addr);
let backoff2 = ps.mark_dead(addr);
assert_eq!(backoff2, Some(Duration::from_mins(1)));
let attempt_after_second = ps.states.get(&addr).expect("entry exists").backoff_attempt;
assert_eq!(attempt_after_second, 2);
}
#[test]
fn snapshot_is_serializable() {
let snap = PeerPipelineSnapshot {
known: 42,
queued: 10,
connecting: 5,
live: 20,
dead: 7,
};
let json = serde_json::to_string(&snap).expect("should serialize");
assert!(json.contains("\"known\":42"));
assert!(json.contains("\"live\":20"));
}
#[test]
fn mark_connecting_sets_connecting_since() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let before = Instant::now();
ps.mark_connecting(addr);
let after = Instant::now();
let entry = ps.states.get(&addr).expect("entry should exist");
let since = entry
.connecting_since
.expect("connecting_since should be set");
assert!(since >= before && since <= after);
assert!(entry.tcp_connected_at.is_none());
}
#[test]
fn set_tcp_connected_marks_connecting_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let before = Instant::now();
ps.set_tcp_connected(addr);
let after = Instant::now();
let entry = ps.states.get(&addr).expect("entry should exist");
let connected_at = entry
.tcp_connected_at
.expect("tcp_connected_at should be set");
assert!(connected_at >= before && connected_at <= after);
}
#[test]
fn set_tcp_connected_no_op_for_non_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.set_tcp_connected(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert!(
entry.tcp_connected_at.is_none(),
"should not set tcp_connected_at for non-Connecting peer"
);
}
#[test]
fn soft_reap_candidates_finds_unreachable_peers() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr(6881);
let addr2 = test_addr_ip(2, 6882);
let addr3 = test_addr_ip(3, 6883);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Tracker);
ps.add_if_not_seen(addr3, PeerSource::Tracker);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
ps.mark_connecting(addr3);
ps.set_tcp_connected(addr2);
let backdated = Instant::now()
.checked_sub(Duration::from_secs(5))
.unwrap_or_else(Instant::now);
ps.test_backdate_connecting_since(addr1, backdated);
ps.test_backdate_connecting_since(addr3, backdated);
let candidates = ps.soft_reap_candidates(Duration::from_secs(3));
assert_eq!(
candidates.len(),
2,
"should find 2 peers without TCP SYN-ACK past timeout"
);
assert!(candidates.contains(&addr1));
assert!(candidates.contains(&addr3));
assert!(
!candidates.contains(&addr2),
"peer with tcp_connected_at should be spared"
);
}
#[test]
fn soft_reap_spares_recently_connecting_peers() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let candidates = ps.soft_reap_candidates(Duration::from_secs(3));
assert!(
candidates.is_empty(),
"recently connecting peer should not be reaped"
);
}
#[test]
fn soft_reap_candidates_into_matches_vec_variant() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr_ip(1, 1000);
let addr2 = test_addr_ip(2, 1001);
let addr3 = test_addr_ip(3, 1002);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Tracker);
ps.add_if_not_seen(addr3, PeerSource::Tracker);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
ps.mark_connecting(addr3);
std::thread::sleep(Duration::from_millis(50));
let vec_result = ps.soft_reap_candidates(Duration::from_millis(10));
let mut buf: Vec<SocketAddr> = Vec::new();
ps.soft_reap_candidates_into(Duration::from_millis(10), &mut buf);
let mut a = vec_result;
let mut b = buf.clone();
a.sort();
b.sort();
assert_eq!(a, b, "into variant must match Vec variant");
}
#[test]
fn soft_reap_candidates_into_clears_caller_buffer() {
let (ps, _rx) = make_peer_states();
let mut buf = vec![test_addr(42), test_addr(43)];
ps.soft_reap_candidates_into(Duration::from_hours(1), &mut buf);
assert!(buf.is_empty(), "into variant must clear caller buffer");
}
#[test]
fn soft_reap_candidates_into_with_no_peers() {
let (ps, _rx) = make_peer_states();
let mut buf = vec![test_addr(99), test_addr(100)]; ps.soft_reap_candidates_into(Duration::from_millis(100), &mut buf);
assert!(
buf.is_empty(),
"empty PeerStates must produce empty buffer (clearing stale content)"
);
}
#[test]
fn promotion_flow_increments_live_count() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert_eq!(ps.stats.snapshot().live, 0);
ps.mark_connecting(addr);
assert_eq!(ps.stats.snapshot().live, 0);
assert_eq!(ps.stats.snapshot().connecting, 1);
ps.set_tcp_connected(addr);
assert_eq!(ps.stats.snapshot().live, 0);
ps.mark_live(addr);
assert_eq!(ps.stats.snapshot().live, 1);
assert_eq!(ps.stats.snapshot().connecting, 0);
}
#[test]
fn test_eviction_ban_blocks_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(7001);
assert!(
!ps.is_eviction_banned(&addr),
"should not be banned before add"
);
ps.add_eviction_ban(addr);
assert!(
ps.is_eviction_banned(&addr),
"should be banned immediately after add"
);
}
#[test]
fn test_eviction_ban_expires() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(7002);
ps.add_eviction_ban(addr);
assert!(ps.is_eviction_banned(&addr), "fresh ban should block");
let entry = ps.eviction_bans.get(&addr).expect("ban entry should exist");
assert!(
entry.elapsed() < Duration::from_secs(5),
"ban timestamp should be recent"
);
}
#[test]
fn banned_set_fifo_cap_drops_oldest_when_full() {
let (tx, _rx) = mpsc::unbounded_channel();
let ps = PeerStates::new_with_config(tx, 4, EVICTION_BAN_DURATION);
let p1 = test_addr_ip(1, 8001);
let p2 = test_addr_ip(2, 8002);
let p3 = test_addr_ip(3, 8003);
let p4 = test_addr_ip(4, 8004);
let p5 = test_addr_ip(5, 8005);
ps.add_eviction_ban(p1);
ps.add_eviction_ban(p2);
ps.add_eviction_ban(p3);
ps.add_eviction_ban(p4);
assert_eq!(ps.eviction_ban_count(), 4, "all 4 should fit at cap=4");
ps.add_eviction_ban(p5);
assert_eq!(ps.eviction_ban_count(), 4, "cap should hold steady");
assert!(
!ps.is_eviction_banned(&p1),
"oldest entry should be dropped"
);
assert!(ps.is_eviction_banned(&p2));
assert!(ps.is_eviction_banned(&p3));
assert!(ps.is_eviction_banned(&p4));
assert!(ps.is_eviction_banned(&p5));
}
#[test]
fn ban_duration_zero_expires_immediately() {
let (tx, _rx) = mpsc::unbounded_channel();
let ps = PeerStates::new_with_config(tx, 1024, Duration::from_secs(0));
let addr = test_addr(8100);
ps.add_eviction_ban(addr);
assert!(
!ps.is_eviction_banned(&addr),
"ban with zero duration must expire instantly"
);
}
#[test]
fn banned_set_cap_floor_is_one() {
let (tx, _rx) = mpsc::unbounded_channel();
let ps = PeerStates::new_with_config(tx, 0, EVICTION_BAN_DURATION);
let only = test_addr_ip(1, 8101);
ps.add_eviction_ban(only);
assert!(
ps.is_eviction_banned(&only),
"cap-floor of 1 must hold the entry"
);
}
#[test]
fn test_eviction_ban_not_present_returns_false() {
let (ps, _rx) = make_peer_states();
let unknown = test_addr(7003);
assert!(
!ps.is_eviction_banned(&unknown),
"unknown address should not be banned"
);
}
#[test]
fn test_eviction_ban_blocks_specific_peer_only() {
let (ps, _rx) = make_peer_states();
let banned_addr = test_addr_ip(10, 7010);
let innocent_addr = test_addr_ip(20, 7020);
let another_addr = test_addr_ip(30, 7030);
ps.add_eviction_ban(banned_addr);
assert!(
ps.is_eviction_banned(&banned_addr),
"banned peer should be blocked"
);
assert!(
!ps.is_eviction_banned(&innocent_addr),
"innocent peer should not be blocked by another peer's ban"
);
assert!(
!ps.is_eviction_banned(&another_addr),
"unrelated peer should not be blocked"
);
ps.add_eviction_ban(innocent_addr);
assert!(
ps.is_eviction_banned(&banned_addr),
"first ban should still be active"
);
assert!(
ps.is_eviction_banned(&innocent_addr),
"second peer should now be banned"
);
assert!(
!ps.is_eviction_banned(&another_addr),
"third peer should remain unaffected"
);
}
fn idx_buckets(ps: &PeerStates) -> usize {
ps.connecting_since_index.lock().len()
}
fn idx_total_addrs(ps: &PeerStates) -> usize {
ps.connecting_since_index
.lock()
.values()
.map(smallvec::SmallVec::len)
.sum()
}
fn idx_contains(ps: &PeerStates, addr: SocketAddr) -> bool {
ps.connecting_since_index
.lock()
.values()
.any(|bucket| bucket.contains(&addr))
}
#[test]
fn index_inserted_on_mark_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert_eq!(idx_total_addrs(&ps), 0, "Queued state must not be indexed");
ps.mark_connecting(addr);
assert_eq!(idx_total_addrs(&ps), 1);
assert!(idx_contains(&ps, addr));
let entry_ts = ps
.states
.get(&addr)
.expect("entry exists")
.connecting_since
.expect("connecting_since set");
assert!(ps.connecting_since_index.lock().contains_key(&entry_ts));
}
#[test]
fn index_removed_on_set_tcp_connected() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
assert_eq!(idx_total_addrs(&ps), 1);
ps.set_tcp_connected(addr);
assert_eq!(idx_total_addrs(&ps), 0);
assert_eq!(idx_buckets(&ps), 0, "empty buckets must be pruned");
}
#[test]
fn index_removed_on_mark_live() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
assert_eq!(idx_total_addrs(&ps), 0);
assert_eq!(idx_buckets(&ps), 0);
}
#[test]
fn index_removed_on_mark_dead_normal() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
assert_eq!(idx_total_addrs(&ps), 1);
let backoff = ps.mark_dead(addr);
assert!(backoff.is_some(), "should return backoff in retry window");
assert_eq!(idx_total_addrs(&ps), 0);
assert_eq!(idx_buckets(&ps), 0);
}
#[test]
fn index_removed_on_mark_dead_exhausted() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
assert_eq!(idx_total_addrs(&ps), 1);
if let Some(mut entry) = ps.states.get_mut(&addr) {
let too_old = Instant::now().checked_sub(Duration::from_secs(RETRY_WINDOW_SECS + 1));
if let Some(old) = too_old {
entry.first_failure_at = Some(old);
} else {
return;
}
}
let result = ps.mark_dead(addr);
assert!(result.is_none(), "exhausted path returns None");
assert!(
ps.states.get(&addr).is_none(),
"DashMap entry must be removed in exhausted path"
);
assert_eq!(idx_total_addrs(&ps), 0, "index must also be empty");
assert_eq!(idx_buckets(&ps), 0);
}
#[test]
fn index_removed_on_mark_dead_from_live_no_op() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
assert_eq!(idx_total_addrs(&ps), 0, "Live state must not be indexed");
let _ = ps.mark_dead(addr);
assert_eq!(idx_total_addrs(&ps), 0);
assert_eq!(idx_buckets(&ps), 0);
}
#[test]
fn mark_queued_for_retry_leaves_index_empty() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let _ = ps.mark_dead(addr);
assert_eq!(idx_total_addrs(&ps), 0);
ps.mark_queued_for_retry(addr);
assert_eq!(
idx_total_addrs(&ps),
0,
"mark_queued_for_retry must not touch the index"
);
}
#[test]
fn index_handles_relifecycle() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let first_ts = ps
.states
.get(&addr)
.expect("entry exists")
.connecting_since
.expect("connecting_since set");
ps.mark_live(addr);
let _ = ps.mark_dead(addr);
ps.mark_queued_for_retry(addr);
assert_eq!(idx_total_addrs(&ps), 0);
std::thread::sleep(Duration::from_millis(2));
ps.mark_connecting(addr);
let second_ts = ps
.states
.get(&addr)
.expect("entry exists")
.connecting_since
.expect("connecting_since set");
assert!(
second_ts > first_ts,
"second cycle must yield a later connecting_since"
);
assert_eq!(idx_total_addrs(&ps), 1);
assert!(
ps.connecting_since_index.lock().contains_key(&second_ts),
"second cycle's Instant must be the index key"
);
assert!(
!ps.connecting_since_index.lock().contains_key(&first_ts),
"first cycle's Instant must not linger in the index"
);
}
#[test]
fn index_handles_collision() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr_ip(1, 6881);
let addr2 = test_addr_ip(2, 6882);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Tracker);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
let shared_ts = Instant::now()
.checked_sub(Duration::from_secs(5))
.unwrap_or_else(Instant::now);
ps.test_backdate_connecting_since(addr1, shared_ts);
ps.test_backdate_connecting_since(addr2, shared_ts);
let idx = ps.connecting_since_index.lock();
let bucket = idx.get(&shared_ts).expect("collision bucket exists");
assert_eq!(bucket.len(), 2, "both addrs collide into one bucket");
assert!(bucket.contains(&addr1));
assert!(bucket.contains(&addr2));
drop(idx);
let candidates = ps.soft_reap_candidates(Duration::from_secs(3));
assert_eq!(candidates.len(), 2);
assert!(candidates.contains(&addr1));
assert!(candidates.contains(&addr2));
}
#[test]
fn soft_reap_short_circuits_on_first_unexpired() {
let (ps, _rx) = make_peer_states();
let mut expired = Vec::with_capacity(50);
let mut fresh = Vec::with_capacity(50);
for i in 0u8..100 {
let addr = test_addr_ip(i, 6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
if i < 50 {
expired.push(addr);
} else {
fresh.push(addr);
}
}
let backdated = Instant::now()
.checked_sub(Duration::from_secs(10))
.unwrap_or_else(Instant::now);
for &addr in &expired {
ps.test_backdate_connecting_since(addr, backdated);
}
let candidates = ps.soft_reap_candidates(Duration::from_secs(5));
assert_eq!(candidates.len(), 50, "exactly 50 expired peers");
for &addr in &expired {
assert!(candidates.contains(&addr), "expired addr {addr:?} missing");
}
for &addr in &fresh {
assert!(
!candidates.contains(&addr),
"fresh addr {addr:?} must not be reaped"
);
}
}
#[test]
fn soft_reap_handles_max_timeout() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let candidates = ps.soft_reap_candidates(Duration::MAX);
assert!(
candidates.is_empty(),
"Duration::MAX must not yield any reap candidates"
);
}
#[test]
fn soft_reap_stress_500_peers() {
let (ps, _rx) = make_peer_states();
let mut addrs = Vec::with_capacity(500);
for i in 0u32..500 {
#[allow(clippy::cast_possible_truncation)]
let octet1 = ((i / 256) & 0xff) as u8;
#[allow(clippy::cast_possible_truncation)]
let octet2 = (i & 0xff) as u8;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, octet1, octet2, 1)), 6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
addrs.push(addr);
}
assert_eq!(idx_total_addrs(&ps), 500);
let backdated = Instant::now()
.checked_sub(Duration::from_secs(10))
.unwrap_or_else(Instant::now);
for &addr in &addrs[..250] {
ps.test_backdate_connecting_since(addr, backdated);
}
let candidates = ps.soft_reap_candidates(Duration::from_secs(5));
assert_eq!(candidates.len(), 250, "250 backdated peers must reap");
for &addr in &addrs {
ps.mark_live(addr);
}
assert_eq!(
idx_total_addrs(&ps),
0,
"leak canary: index must drain to 0 after all peers go Live"
);
assert_eq!(
idx_buckets(&ps),
0,
"leak canary: index buckets must all be pruned"
);
}
fn assert_index_dashmap_invariant(ps: &PeerStates) {
let idx_entries: Vec<(Instant, Vec<SocketAddr>)> = ps
.connecting_since_index
.lock()
.iter()
.map(|(ts, bucket)| (*ts, bucket.iter().copied().collect()))
.collect();
for (ts, bucket) in &idx_entries {
for addr in bucket {
let entry = ps
.states
.get(addr)
.unwrap_or_else(|| panic!("indexed addr {addr:?} missing from DashMap"));
assert_eq!(
entry.state,
PeerLifecycle::Connecting,
"indexed addr {addr:?} has state {:?}, expected Connecting",
entry.state,
);
assert!(
entry.tcp_connected_at.is_none(),
"indexed addr {addr:?} has tcp_connected_at set",
);
assert_eq!(
entry.connecting_since,
Some(*ts),
"indexed addr {addr:?} bucket-ts {ts:?} mismatch with entry connecting_since",
);
}
}
for entry in &ps.states {
if entry.state == PeerLifecycle::Connecting && entry.tcp_connected_at.is_none() {
let addr = *entry.key();
let ts = entry
.connecting_since
.expect("Connecting must have connecting_since");
drop(entry);
let idx = ps.connecting_since_index.lock();
let bucket = idx
.get(&ts)
.unwrap_or_else(|| panic!("Connecting addr {addr:?} missing from index"));
assert!(
bucket.contains(&addr),
"Connecting addr {addr:?} not in bucket at ts {ts:?}",
);
}
}
}
fn pool_addr(i: u8) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 2, i)), 6881)
}
#[derive(Debug, Clone, Copy)]
enum Op {
Add(u8),
MarkConnecting(u8),
SetTcpConnected(u8),
MarkLive(u8),
MarkDead(u8),
MarkQueuedForRetry(u8),
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig {
cases: 64,
..ProptestConfig::default()
})]
#[test]
fn index_dashmap_invariant_under_random_transitions(
ops in proptest::collection::vec(
prop_oneof![
(0u8..8u8).prop_map(Op::Add),
(0u8..8u8).prop_map(Op::MarkConnecting),
(0u8..8u8).prop_map(Op::SetTcpConnected),
(0u8..8u8).prop_map(Op::MarkLive),
(0u8..8u8).prop_map(Op::MarkDead),
(0u8..8u8).prop_map(Op::MarkQueuedForRetry),
],
1..200,
)
) {
let (ps, _rx) = make_peer_states();
for op in ops {
let i = match op {
Op::Add(i)
| Op::MarkConnecting(i)
| Op::SetTcpConnected(i)
| Op::MarkLive(i)
| Op::MarkDead(i)
| Op::MarkQueuedForRetry(i) => i,
};
let addr = pool_addr(i);
match op {
Op::Add(_) => {
ps.add_if_not_seen(addr, PeerSource::Tracker);
}
Op::MarkConnecting(_) => {
ps.mark_connecting(addr);
}
Op::SetTcpConnected(_) => {
ps.set_tcp_connected(addr);
}
Op::MarkLive(_) => {
ps.mark_live(addr);
}
Op::MarkDead(_) => {
let _ = ps.mark_dead(addr);
}
Op::MarkQueuedForRetry(_) => {
ps.mark_queued_for_retry(addr);
}
}
assert_index_dashmap_invariant(&ps);
}
}
}
}